Skip to content

Commit

Permalink
Add tests for idle timeout and max lifetime
Browse files Browse the repository at this point in the history
Also fix a bug where the executor's queue was being cleared whenever a
PooledConnection dropped.
  • Loading branch information
sfackler committed Jan 7, 2016
1 parent 79bd6f5 commit 51c6aef
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 20 deletions.
18 changes: 9 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ struct SharedPool<M> where M: ManageConnection {
thread_pool: ScheduledThreadPool,
}

impl<M> Drop for SharedPool<M> where M: ManageConnection {
fn drop(&mut self) {
self.thread_pool.clear();
}
}

fn drop_conn<M>(shared: &Arc<SharedPool<M>>,
internals: &mut PoolInternals<M::Connection>)
where M: ManageConnection
Expand Down Expand Up @@ -245,12 +251,6 @@ pub struct Pool<M> where M: ManageConnection {
shared: Arc<SharedPool<M>>,
}

impl<M> Drop for Pool<M> where M: ManageConnection {
fn drop(&mut self) {
self.shared.thread_pool.clear();
}
}

/// Returns a new `Pool` referencing the same state as `self`.
impl<M> Clone for Pool<M> where M: ManageConnection {
fn clone(&self) -> Pool<M> {
Expand Down Expand Up @@ -310,13 +310,13 @@ impl<M> Pool<M> where M: ManageConnection {
/// connections.
pub fn new(config: Config<M::Connection, M::Error>, manager: M)
-> Result<Pool<M>, InitializationError> {
Pool::new_inner(config, manager, Duration::seconds(30))
Pool::new_inner(config, manager, 30)
}

// for testing
fn new_inner(config: Config<M::Connection, M::Error>,
manager: M,
reaper_rate: Duration)
reaper_rate: i64)
-> Result<Pool<M>, InitializationError> {
let internals = PoolInternals {
conns: VecDeque::with_capacity(config.pool_size() as usize),
Expand Down Expand Up @@ -357,7 +357,7 @@ impl<M> Pool<M> where M: ManageConnection {

if shared.config.max_lifetime().is_some() || shared.config.idle_timeout().is_some() {
let s = shared.clone();
shared.thread_pool.run_at_fixed_rate(reaper_rate,
shared.thread_pool.run_at_fixed_rate(Duration::seconds(reaper_rate),
move || reap_connections(&s));
}

Expand Down
25 changes: 15 additions & 10 deletions src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,8 @@ impl ScheduledThreadPool {
shared: Arc::new(shared),
};

for _ in 0..size {
let mut worker = Worker {
shared: pool.shared.clone(),
};

thread::spawn(move || worker.run());
for i in 0..size {
Worker::start(i, pool.shared.clone());
}

pool
Expand Down Expand Up @@ -136,22 +132,31 @@ impl ScheduledThreadPool {
}

struct Worker {
i: usize,
shared: Arc<SharedPool>,
}

impl Drop for Worker {
fn drop(&mut self) {
// Start up a new worker if this one's going away due to a panic from a job
if thread::panicking() {
let mut worker = Worker {
shared: self.shared.clone(),
};
thread::spawn(move || worker.run());
Worker::start(self.i, self.shared.clone());
}
}
}

impl Worker {
fn start(i: usize, shared: Arc<SharedPool>) {
let mut worker = Worker {
i: i,
shared: shared,
};
thread::Builder::new()
.name(format!("ScheduledThreadPool worker {}", i))
.spawn(move || worker.run())
.unwrap();
}

fn run(&mut self) {
loop {
match self.get_job() {
Expand Down
100 changes: 99 additions & 1 deletion src/test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::atomic::{AtomicBool, ATOMIC_BOOL_INIT, Ordering};
use std::sync::atomic::{AtomicBool, ATOMIC_BOOL_INIT, AtomicUsize, ATOMIC_USIZE_INIT, AtomicIsize,
Ordering};
use std::sync::mpsc::{self, SyncSender, Receiver};
use std::sync::{Mutex, Arc};
use std::time::Duration;
Expand Down Expand Up @@ -256,3 +257,100 @@ fn test_connection_customizer() {
assert_eq!(1, conn.0);
assert!(DROPPED.load(Ordering::SeqCst));
}

#[test]
fn test_idle_timeout() {
static DROPPED: AtomicUsize = ATOMIC_USIZE_INIT;

struct Connection;

impl Drop for Connection {
fn drop(&mut self) {
DROPPED.fetch_add(1, Ordering::SeqCst);
}
}

struct Handler(AtomicIsize);

impl ManageConnection for Handler {
type Connection = Connection;
type Error = ();

fn connect(&self) -> Result<Connection, ()> {
if self.0.fetch_sub(1, Ordering::SeqCst) > 0 {
Ok(Connection)
} else {
Err(())
}
}

fn is_valid(&self, _: &mut Connection) -> Result<(), ()> {
Ok(())
}

fn has_broken(&self, _: &mut Connection) -> bool {
false
}
}

let config = Config::builder()
.pool_size(5)
.idle_timeout(Some(Duration::from_secs(1)))
.build();
let pool = Pool::new_inner(config, Handler(AtomicIsize::new(5)), 1).unwrap();
let conn = pool.get().unwrap();
thread::sleep(Duration::from_secs(2));
assert_eq!(4, DROPPED.load(Ordering::SeqCst));
drop(conn);
assert_eq!(4, DROPPED.load(Ordering::SeqCst));
}

#[test]
fn test_max_lifetime() {
static DROPPED: AtomicUsize = ATOMIC_USIZE_INIT;

struct Connection;

impl Drop for Connection {
fn drop(&mut self) {
DROPPED.fetch_add(1, Ordering::SeqCst);
}
}

struct Handler(AtomicIsize);

impl ManageConnection for Handler {
type Connection = Connection;
type Error = ();

fn connect(&self) -> Result<Connection, ()> {
if self.0.fetch_sub(1, Ordering::SeqCst) > 0 {
Ok(Connection)
} else {
Err(())
}
}

fn is_valid(&self, _: &mut Connection) -> Result<(), ()> {
Ok(())
}

fn has_broken(&self, _: &mut Connection) -> bool {
false
}
}

let config = Config::builder()
.pool_size(5)
.max_lifetime(Some(Duration::from_secs(1)))
.connection_timeout(Duration::from_secs(1))
.build();
let pool = Pool::new_inner(config, Handler(AtomicIsize::new(5)), 1).unwrap();
let conn = pool.get().unwrap();
thread::sleep(Duration::from_secs(2));
assert_eq!(4, DROPPED.load(Ordering::SeqCst));
drop(conn);
thread::sleep(Duration::from_secs(2));
assert_eq!(5, DROPPED.load(Ordering::SeqCst));
assert!(pool.get().is_err());
}

0 comments on commit 51c6aef

Please sign in to comment.