diff --git a/src/config.rs b/src/config.rs index e8be9477..219f0deb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -264,7 +264,6 @@ pub enum PoolMode { #[serde(alias = "session", alias = "Session")] Session, } - impl ToString for PoolMode { fn to_string(&self) -> String { match *self { @@ -274,11 +273,33 @@ impl ToString for PoolMode { } } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Copy, Hash)] +pub enum LoadBalancingMode { + #[serde(alias = "random", alias = "Random")] + Random, + + #[serde(alias = "loc", alias = "LOC", alias = "least_outstanding_connections")] + LeastOutstandingConnections, +} +impl ToString for LoadBalancingMode { + fn to_string(&self) -> String { + match *self { + LoadBalancingMode::Random => "random".to_string(), + LoadBalancingMode::LeastOutstandingConnections => { + "least_outstanding_connections".to_string() + } + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)] pub struct Pool { #[serde(default = "Pool::default_pool_mode")] pub pool_mode: PoolMode, + #[serde(default = "Pool::default_load_balancing_mode")] + pub load_balancing_mode: LoadBalancingMode, + pub default_role: String, #[serde(default)] // False @@ -305,6 +326,10 @@ impl Pool { PoolMode::Transaction } + pub fn default_load_balancing_mode() -> LoadBalancingMode { + LoadBalancingMode::Random + } + pub fn default_automatic_sharding_key() -> Option { None } @@ -345,6 +370,7 @@ impl Default for Pool { fn default() -> Pool { Pool { pool_mode: Self::default_pool_mode(), + load_balancing_mode: Self::default_load_balancing_mode(), shards: BTreeMap::from([(String::from("1"), Shard::default())]), users: BTreeMap::default(), default_role: String::from("any"), @@ -471,6 +497,10 @@ impl From<&Config> for std::collections::HashMap { format!("pools.{}.pool_mode", pool_name), pool.pool_mode.to_string(), ), + ( + format!("pools.{}.load_balancing_mode", pool_name), + pool.load_balancing_mode.to_string(), + ), ( format!("pools.{}.primary_reads_enabled", pool_name), pool.primary_reads_enabled.to_string(), @@ -594,6 +624,10 @@ impl Config { "[pool: {}] Pool mode: {:?}", pool_name, pool_config.pool_mode ); + info!( + "[pool: {}] Load Balancing mode: {:?}", + pool_name, pool_config.load_balancing_mode + ); let connect_timeout = match pool_config.connect_timeout { Some(connect_timeout) => connect_timeout, None => self.general.connect_timeout, diff --git a/src/pool.rs b/src/pool.rs index 94f69627..82720aaa 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -12,7 +12,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Instant; -use crate::config::{get_config, Address, General, PoolMode, Role, User}; +use crate::config::{get_config, Address, General, LoadBalancingMode, PoolMode, Role, User}; use crate::errors::Error; use crate::server::Server; @@ -62,6 +62,9 @@ pub struct PoolSettings { /// Transaction or Session. pub pool_mode: PoolMode, + /// Random or LeastOutstandingConnections. + pub load_balancing_mode: LoadBalancingMode, + // Number of shards. pub shards: usize, @@ -94,6 +97,7 @@ impl Default for PoolSettings { fn default() -> PoolSettings { PoolSettings { pool_mode: PoolMode::Transaction, + load_balancing_mode: LoadBalancingMode::Random, shards: 1, user: User::default(), default_role: None, @@ -257,6 +261,7 @@ impl ConnectionPool { server_info: BytesMut::new(), settings: PoolSettings { pool_mode: pool_config.pool_mode, + load_balancing_mode: pool_config.load_balancing_mode, // shards: pool_config.shards.clone(), shards: shard_ids.len(), user: user.clone(), @@ -356,8 +361,17 @@ impl ConnectionPool { .filter(|address| address.role == role) .collect(); - // Random load balancing + // We shuffle even if least_outstanding_queries is used to avoid imbalance + // in cases where all candidates have more or less the same number of outstanding + // queries candidates.shuffle(&mut thread_rng()); + if self.settings.load_balancing_mode == LoadBalancingMode::LeastOutstandingConnections { + candidates.sort_by(|a, b| { + self.busy_connection_count(b) + .partial_cmp(&self.busy_connection_count(a)) + .unwrap() + }); + } while !candidates.is_empty() { // Get the next candidate @@ -565,6 +579,20 @@ impl ConnectionPool { pub fn server_info(&self) -> BytesMut { self.server_info.clone() } + + fn busy_connection_count(&self, address: &Address) -> u32 { + let state = self.pool_state(address.shard, address.address_index); + let idle = state.idle_connections; + let provisioned = state.connections; + + if idle > provisioned { + // Unlikely but avoids an overflow panic if this ever happens + return 0; + } + let busy = provisioned - idle; + debug!("{:?} has {:?} busy connections", address, busy); + return busy; + } } /// Wrapper for the bb8 connection pool. diff --git a/src/query_router.rs b/src/query_router.rs index 50905716..03f46019 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -768,6 +768,7 @@ mod test { let pool_settings = PoolSettings { pool_mode: PoolMode::Transaction, + load_balancing_mode: crate::config::LoadBalancingMode::Random, shards: 2, user: crate::config::User::default(), default_role: Some(Role::Replica), diff --git a/tests/ruby/helpers/pgcat_helper.rb b/tests/ruby/helpers/pgcat_helper.rb index 55847ed6..ffa60953 100644 --- a/tests/ruby/helpers/pgcat_helper.rb +++ b/tests/ruby/helpers/pgcat_helper.rb @@ -5,7 +5,7 @@ module Helpers module Pgcat - def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction") + def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random") user = { "password" => "sharding_user", "pool_size" => pool_size, @@ -23,6 +23,7 @@ def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction") "#{pool_name}" => { "default_role" => "any", "pool_mode" => pool_mode, + "load_balancing_mode" => lb_mode, "primary_reads_enabled" => false, "query_parser_enabled" => false, "sharding_function" => "pg_bigint_hash", @@ -46,7 +47,7 @@ def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction") end end - def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction") + def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random") user = { "password" => "sharding_user", "pool_size" => pool_size, @@ -64,6 +65,7 @@ def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction") "#{pool_name}" => { "default_role" => "primary", "pool_mode" => pool_mode, + "load_balancing_mode" => lb_mode, "primary_reads_enabled" => false, "query_parser_enabled" => false, "sharding_function" => "pg_bigint_hash", @@ -90,7 +92,7 @@ def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction") end end - def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction") + def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random") user = { "password" => "sharding_user", "pool_size" => pool_size, @@ -111,6 +113,7 @@ def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction") "#{pool_name}" => { "default_role" => "any", "pool_mode" => pool_mode, + "load_balancing_mode" => lb_mode, "primary_reads_enabled" => false, "query_parser_enabled" => false, "sharding_function" => "pg_bigint_hash", diff --git a/tests/ruby/load_balancing_spec.rb b/tests/ruby/load_balancing_spec.rb index bd98a831..8be066df 100644 --- a/tests/ruby/load_balancing_spec.rb +++ b/tests/ruby/load_balancing_spec.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true require_relative 'spec_helper' -describe "Load Balancing" do +describe "Random Load Balancing" do let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) } after do processes.all_databases.map(&:reset) @@ -59,3 +59,107 @@ end end +describe "Least Outstanding Queries Load Balancing" do + let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 1, "transaction", "loc") } + after do + processes.all_databases.map(&:reset) + processes.pgcat.shutdown + end + + context "under homogenous load" do + it "balances query volume between all instances" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + + query_count = QUERY_COUNT + expected_share = query_count / processes.all_databases.count + failed_count = 0 + + query_count.times do + conn.async_exec("SELECT 1 + 2") + rescue + failed_count += 1 + end + + expect(failed_count).to eq(0) + processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share| + expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share) + end + end + end + + context "under heterogeneous load" do + it "balances query volume between all instances based on how busy they are" do + slow_query_count = 2 + threads = Array.new(slow_query_count) do + Thread.new do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("SELECT pg_sleep(1)") + end + end + + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + + query_count = QUERY_COUNT + expected_share = query_count / (processes.all_databases.count - slow_query_count) + failed_count = 0 + + query_count.times do + conn.async_exec("SELECT 1 + 2") + rescue + failed_count += 1 + end + + expect(failed_count).to eq(0) + # Under LOQ, we expect replicas running the slow pg_sleep + # to get no selects + expect( + processes. + all_databases. + map(&:count_select_1_plus_2). + count { |instance_share| instance_share == 0 } + ).to eq(slow_query_count) + + # We also expect the quick queries to be spread across + # the idle servers only + processes. + all_databases. + map(&:count_select_1_plus_2). + reject { |instance_share| instance_share == 0 }. + each do |instance_share| + expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share) + end + + threads.map(&:join) + end + end + + context "when some replicas are down" do + it "balances query volume between working instances" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + expected_share = QUERY_COUNT / (processes.all_databases.count - 2) + failed_count = 0 + + processes[:replicas][0].take_down do + processes[:replicas][1].take_down do + QUERY_COUNT.times do + conn.async_exec("SELECT 1 + 2") + rescue + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + failed_count += 1 + end + end + end + + expect(failed_count).to eq(2) + processes.all_databases.each do |instance| + queries_routed = instance.count_select_1_plus_2 + if processes.replicas[0..1].include?(instance) + expect(queries_routed).to eq(0) + else + expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share) + end + end + end + end +end +