Skip to content

Commit

Permalink
Use stdlib Instant
Browse files Browse the repository at this point in the history
  • Loading branch information
sfackler committed Jan 7, 2016
1 parent 9339a6a commit 6b1f1dc
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 56 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,3 @@ keywords = ["database", "pool"]

[dependencies]
log = "0.3"
time = "0.1.22"
63 changes: 30 additions & 33 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
//! ```
#![warn(missing_docs)]
#![doc(html_root_url="https://sfackler.github.io/r2d2/doc/v0.6.1")]
#![feature(time2)]

#[macro_use]
extern crate log;
extern crate time;

use std::cmp;
use std::collections::VecDeque;
Expand All @@ -52,7 +52,7 @@ use std::fmt;
use std::ops::{Deref, DerefMut};
use std::mem;
use std::sync::{Arc, Mutex, Condvar};
use time::{Duration, SteadyTime};
use std::time::{Duration, Instant};

#[doc(inline)]
pub use config::Config;
Expand Down Expand Up @@ -143,16 +143,17 @@ impl<C, E> CustomizeConnection<C, E> for NopConnectionCustomizer {}

struct Conn<C> {
conn: C,
birth: SteadyTime,
idle_start: SteadyTime,
birth: Instant,
idle_start: Instant,
}

impl<C> Conn<C> {
fn new(conn: C) -> Conn<C> {
let now = Instant::now();
Conn {
conn: conn,
birth: SteadyTime::now(),
idle_start: SteadyTime::now(),
birth: now,
idle_start: now,
}
}
}
Expand Down Expand Up @@ -194,7 +195,7 @@ fn add_connection<M>(shared: &Arc<SharedPool<M>>,
where M: ManageConnection
{
internals.pending_conns += 1;
inner(Duration::zero(), shared);
inner(Duration::from_secs(0), shared);

fn inner<M>(delay: Duration, shared: &Arc<SharedPool<M>>) where M: ManageConnection {
let new_shared = shared.clone();
Expand All @@ -213,8 +214,8 @@ fn add_connection<M>(shared: &Arc<SharedPool<M>>,
}
Err(err) => {
shared.config.error_handler().handle_error(err);
let delay = cmp::max(Duration::milliseconds(200), delay);
let delay = cmp::min(cvt(shared.config.connection_timeout()) / 2, delay * 2);
let delay = cmp::max(Duration::from_millis(200), delay);
let delay = cmp::min(shared.config.connection_timeout() / 2, delay * 2);
inner(delay, &shared);
},
}
Expand All @@ -228,14 +229,14 @@ fn reap_connections<M>(shared: &Arc<SharedPool<M>>) where M: ManageConnection {

let mut internals = shared.internals.lock().unwrap();
mem::swap(&mut old, &mut internals.conns);
let now = SteadyTime::now();
let now = Instant::now();
for conn in old {
let mut reap = false;
if let Some(timeout) = shared.config.idle_timeout() {
reap |= now - conn.idle_start >= cvt(timeout);
reap |= conn.idle_start + timeout <= now;
}
if let Some(lifetime) = shared.config.max_lifetime() {
reap |= now - conn.birth >= cvt(lifetime);
reap |= conn.birth + lifetime <= now;
}
if reap {
drop_conn(shared, &mut internals);
Expand Down Expand Up @@ -317,7 +318,7 @@ impl<M> Pool<M> where M: ManageConnection {
// for testing
fn new_inner(config: Config<M::Connection, M::Error>,
manager: M,
reaper_rate: i64)
reaper_rate: u64)
-> Result<Pool<M>, InitializationError> {
let internals = PoolInternals {
conns: VecDeque::with_capacity(config.pool_size() as usize),
Expand All @@ -342,23 +343,22 @@ impl<M> Pool<M> where M: ManageConnection {
}

if shared.config.initialization_fail_fast() {
let end = SteadyTime::now() + cvt(shared.config.connection_timeout());
let end = Instant::now() + shared.config.connection_timeout();
let mut internals = shared.internals.lock().unwrap();

while internals.num_conns != shared.config.pool_size() {
let wait = end - SteadyTime::now();
if wait <= Duration::zero() {
let now = Instant::now();
if now >= end {
return Err(InitializationError(()));
}
internals = shared.cond.wait_timeout_ms(internals,
wait.num_milliseconds() as u32)
.unwrap().0;
let wait = end.duration_from_earlier(now);
internals = shared.cond.wait_timeout(internals, wait).unwrap().0;
}
}

if shared.config.max_lifetime().is_some() || shared.config.idle_timeout().is_some() {
let s = shared.clone();
shared.thread_pool.run_at_fixed_rate(Duration::seconds(reaper_rate),
shared.thread_pool.run_at_fixed_rate(Duration::from_secs(reaper_rate),
move || reap_connections(&s));
}

Expand All @@ -368,7 +368,7 @@ impl<M> Pool<M> where M: ManageConnection {
}

fn get_inner(&self) -> Result<Conn<M::Connection>, GetTimeout> {
let end = SteadyTime::now() + cvt(self.shared.config.connection_timeout());
let end = Instant::now() + self.shared.config.connection_timeout();
let mut internals = self.shared.internals.lock().unwrap();

loop {
Expand All @@ -392,16 +392,17 @@ impl<M> Pool<M> where M: ManageConnection {
add_connection(&self.shared, &mut internals);
}

let now = SteadyTime::now();
let mut timeout = (end - now).num_milliseconds();
if timeout < 0 {
timeout = 0
let now = Instant::now();
let timeout = if now > end {
Duration::from_secs(0)
} else {
end.duration_from_earlier(now)
};
let (new_internals, no_timeout) =
self.shared.cond.wait_timeout_ms(internals, timeout as u32).unwrap();
let (new_internals, result) =
self.shared.cond.wait_timeout(internals, timeout).unwrap();
internals = new_internals;

if !no_timeout {
if result.timed_out() {
return Err(GetTimeout(()));
}
}
Expand All @@ -428,7 +429,7 @@ impl<M> Pool<M> where M: ManageConnection {
if broken {
drop_conn(&self.shared, &mut internals);
} else {
conn.idle_start = SteadyTime::now();
conn.idle_start = Instant::now();
internals.conns.push_back(conn);
self.shared.cond.notify_one();
}
Expand Down Expand Up @@ -470,7 +471,3 @@ impl<M> DerefMut for PooledConnection<M> where M: ManageConnection {
&mut self.conn.as_mut().unwrap().conn
}
}

fn cvt(d: std::time::Duration) -> Duration {
Duration::seconds(d.as_secs() as i64) + Duration::nanoseconds(d.subsec_nanos() as i64)
}
37 changes: 15 additions & 22 deletions src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use std::collections::BinaryHeap;
use std::cmp::{PartialOrd, Ord, PartialEq, Eq, Ordering};
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use time::Duration;
use time;
use std::time::{Duration, Instant};

use thunk::Thunk;

Expand All @@ -17,7 +16,7 @@ enum JobType {

struct Job {
type_: JobType,
time: u64,
time: Instant,
}

impl PartialOrd for Job {
Expand Down Expand Up @@ -107,21 +106,21 @@ impl ScheduledThreadPool {

#[allow(dead_code)]
pub fn run<F>(&self, job: F) where F: FnOnce() + Send + 'static {
self.run_after(Duration::zero(), job)
self.run_after(Duration::from_secs(0), job)
}

pub fn run_after<F>(&self, dur: Duration, job: F) where F: FnOnce() + Send + 'static {
let job = Job {
type_: JobType::Once(Thunk::new(job)),
time: (time::precise_time_ns() as i64 + dur.num_nanoseconds().unwrap()) as u64,
time: Instant::now() + dur,
};
self.shared.run(job)
}

pub fn run_at_fixed_rate<F>(&self, rate: Duration, f: F) where F: FnMut() + Send + 'static {
let job = Job {
type_: JobType::FixedRate { f: Box::new(f), rate: rate },
time: (time::precise_time_ns() as i64 + rate.num_nanoseconds().unwrap()) as u64,
time: Instant::now() + rate,
};
self.shared.run(job)
}
Expand Down Expand Up @@ -174,24 +173,18 @@ impl Worker {

let mut inner = self.shared.inner.lock().unwrap();
loop {
let now = time::precise_time_ns();
let now = Instant::now();

let need = match inner.queue.peek() {
None if inner.shutdown => return None,
None => Need::Wait,
Some(e) if e.time <= now => break,
Some(e) => Need::WaitTimeout(Duration::nanoseconds(e.time as i64 - now as i64)),
Some(e) => Need::WaitTimeout(e.time.duration_from_earlier(now)),
};

inner = match need {
Need::Wait => self.shared.cvar.wait(inner).unwrap(),
Need::WaitTimeout(t) => {
let mut timeout = t.num_milliseconds();
if timeout < 0 {
timeout = 0;
}
self.shared.cvar.wait_timeout_ms(inner, timeout as u32).unwrap().0
},
Need::WaitTimeout(t) => self.shared.cvar.wait_timeout(inner, t).unwrap().0,
};
}

Expand All @@ -205,7 +198,7 @@ impl Worker {
f();
let new_job = Job {
type_: JobType::FixedRate { f: f, rate: rate },
time: (job.time as i64 + rate.num_nanoseconds().unwrap()) as u64,
time: job.time + rate,
};
self.shared.run(new_job)
}
Expand All @@ -217,7 +210,7 @@ impl Worker {
mod test {
use std::sync::mpsc::channel;
use std::sync::{Arc, Barrier};
use time::Duration;
use std::time::Duration;

use super::ScheduledThreadPool;

Expand Down Expand Up @@ -279,8 +272,8 @@ mod test {
let (tx, rx) = channel();

let tx1 = tx.clone();
pool.run_after(Duration::seconds(1), move || tx1.send(1usize).unwrap());
pool.run_after(Duration::milliseconds(500), move || tx.send(2usize).unwrap());
pool.run_after(Duration::from_secs(1), move || tx1.send(1usize).unwrap());
pool.run_after(Duration::from_millis(500), move || tx.send(2usize).unwrap());

assert_eq!(2, rx.recv().unwrap());
assert_eq!(1, rx.recv().unwrap());
Expand All @@ -292,8 +285,8 @@ mod test {
let (tx, rx) = channel();

let tx1 = tx.clone();
pool.run_after(Duration::seconds(1), move || tx1.send(1usize).unwrap());
pool.run_after(Duration::milliseconds(500), move || tx.send(2usize).unwrap());
pool.run_after(Duration::from_secs(1), move || tx1.send(1usize).unwrap());
pool.run_after(Duration::from_millis(500), move || tx.send(2usize).unwrap());

drop(pool);

Expand All @@ -309,7 +302,7 @@ mod test {

let mut pool2 = Some(pool.clone());
let mut i = 0i32;
pool.run_at_fixed_rate(Duration::milliseconds(500), move || {
pool.run_at_fixed_rate(Duration::from_millis(500), move || {
i += 1;
tx.send(i).unwrap();
rx2.recv().unwrap();
Expand Down

0 comments on commit 6b1f1dc

Please sign in to comment.