From 6b1f1dc3170a65874edfce414d6fad275dae4e0b Mon Sep 17 00:00:00 2001 From: Steven Fackler Date: Wed, 6 Jan 2016 23:37:59 -0800 Subject: [PATCH] Use stdlib Instant --- Cargo.toml | 1 - src/lib.rs | 63 +++++++++++++++++++++++++---------------------------- src/task.rs | 37 +++++++++++++------------------ 3 files changed, 45 insertions(+), 56 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 157acf9b..35dbfa52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,4 +11,3 @@ keywords = ["database", "pool"] [dependencies] log = "0.3" -time = "0.1.22" diff --git a/src/lib.rs b/src/lib.rs index 8564dff7..12c206ed 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,10 +40,10 @@ //! ``` #![warn(missing_docs)] #![doc(html_root_url="https://sfackler.github.io/r2d2/doc/v0.6.1")] +#![feature(time2)] #[macro_use] extern crate log; -extern crate time; use std::cmp; use std::collections::VecDeque; @@ -52,7 +52,7 @@ use std::fmt; use std::ops::{Deref, DerefMut}; use std::mem; use std::sync::{Arc, Mutex, Condvar}; -use time::{Duration, SteadyTime}; +use std::time::{Duration, Instant}; #[doc(inline)] pub use config::Config; @@ -143,16 +143,17 @@ impl CustomizeConnection for NopConnectionCustomizer {} struct Conn { conn: C, - birth: SteadyTime, - idle_start: SteadyTime, + birth: Instant, + idle_start: Instant, } impl Conn { fn new(conn: C) -> Conn { + let now = Instant::now(); Conn { conn: conn, - birth: SteadyTime::now(), - idle_start: SteadyTime::now(), + birth: now, + idle_start: now, } } } @@ -194,7 +195,7 @@ fn add_connection(shared: &Arc>, where M: ManageConnection { internals.pending_conns += 1; - inner(Duration::zero(), shared); + inner(Duration::from_secs(0), shared); fn inner(delay: Duration, shared: &Arc>) where M: ManageConnection { let new_shared = shared.clone(); @@ -213,8 +214,8 @@ fn add_connection(shared: &Arc>, } Err(err) => { shared.config.error_handler().handle_error(err); - let delay = cmp::max(Duration::milliseconds(200), delay); - let delay = cmp::min(cvt(shared.config.connection_timeout()) / 2, delay * 2); + let delay = cmp::max(Duration::from_millis(200), delay); + let delay = cmp::min(shared.config.connection_timeout() / 2, delay * 2); inner(delay, &shared); }, } @@ -228,14 +229,14 @@ fn reap_connections(shared: &Arc>) where M: ManageConnection { let mut internals = shared.internals.lock().unwrap(); mem::swap(&mut old, &mut internals.conns); - let now = SteadyTime::now(); + let now = Instant::now(); for conn in old { let mut reap = false; if let Some(timeout) = shared.config.idle_timeout() { - reap |= now - conn.idle_start >= cvt(timeout); + reap |= conn.idle_start + timeout <= now; } if let Some(lifetime) = shared.config.max_lifetime() { - reap |= now - conn.birth >= cvt(lifetime); + reap |= conn.birth + lifetime <= now; } if reap { drop_conn(shared, &mut internals); @@ -317,7 +318,7 @@ impl Pool where M: ManageConnection { // for testing fn new_inner(config: Config, manager: M, - reaper_rate: i64) + reaper_rate: u64) -> Result, InitializationError> { let internals = PoolInternals { conns: VecDeque::with_capacity(config.pool_size() as usize), @@ -342,23 +343,22 @@ impl Pool where M: ManageConnection { } if shared.config.initialization_fail_fast() { - let end = SteadyTime::now() + cvt(shared.config.connection_timeout()); + let end = Instant::now() + shared.config.connection_timeout(); let mut internals = shared.internals.lock().unwrap(); while internals.num_conns != shared.config.pool_size() { - let wait = end - SteadyTime::now(); - if wait <= Duration::zero() { + let now = Instant::now(); + if now >= end { return Err(InitializationError(())); } - internals = shared.cond.wait_timeout_ms(internals, - wait.num_milliseconds() as u32) - .unwrap().0; + let wait = end.duration_from_earlier(now); + internals = shared.cond.wait_timeout(internals, wait).unwrap().0; } } if shared.config.max_lifetime().is_some() || shared.config.idle_timeout().is_some() { let s = shared.clone(); - shared.thread_pool.run_at_fixed_rate(Duration::seconds(reaper_rate), + shared.thread_pool.run_at_fixed_rate(Duration::from_secs(reaper_rate), move || reap_connections(&s)); } @@ -368,7 +368,7 @@ impl Pool where M: ManageConnection { } fn get_inner(&self) -> Result, GetTimeout> { - let end = SteadyTime::now() + cvt(self.shared.config.connection_timeout()); + let end = Instant::now() + self.shared.config.connection_timeout(); let mut internals = self.shared.internals.lock().unwrap(); loop { @@ -392,16 +392,17 @@ impl Pool where M: ManageConnection { add_connection(&self.shared, &mut internals); } - let now = SteadyTime::now(); - let mut timeout = (end - now).num_milliseconds(); - if timeout < 0 { - timeout = 0 + let now = Instant::now(); + let timeout = if now > end { + Duration::from_secs(0) + } else { + end.duration_from_earlier(now) }; - let (new_internals, no_timeout) = - self.shared.cond.wait_timeout_ms(internals, timeout as u32).unwrap(); + let (new_internals, result) = + self.shared.cond.wait_timeout(internals, timeout).unwrap(); internals = new_internals; - if !no_timeout { + if result.timed_out() { return Err(GetTimeout(())); } } @@ -428,7 +429,7 @@ impl Pool where M: ManageConnection { if broken { drop_conn(&self.shared, &mut internals); } else { - conn.idle_start = SteadyTime::now(); + conn.idle_start = Instant::now(); internals.conns.push_back(conn); self.shared.cond.notify_one(); } @@ -470,7 +471,3 @@ impl DerefMut for PooledConnection where M: ManageConnection { &mut self.conn.as_mut().unwrap().conn } } - -fn cvt(d: std::time::Duration) -> Duration { - Duration::seconds(d.as_secs() as i64) + Duration::nanoseconds(d.subsec_nanos() as i64) -} diff --git a/src/task.rs b/src/task.rs index 545bba28..ba19485c 100644 --- a/src/task.rs +++ b/src/task.rs @@ -2,8 +2,7 @@ use std::collections::BinaryHeap; use std::cmp::{PartialOrd, Ord, PartialEq, Eq, Ordering}; use std::sync::{Arc, Mutex, Condvar}; use std::thread; -use time::Duration; -use time; +use std::time::{Duration, Instant}; use thunk::Thunk; @@ -17,7 +16,7 @@ enum JobType { struct Job { type_: JobType, - time: u64, + time: Instant, } impl PartialOrd for Job { @@ -107,13 +106,13 @@ impl ScheduledThreadPool { #[allow(dead_code)] pub fn run(&self, job: F) where F: FnOnce() + Send + 'static { - self.run_after(Duration::zero(), job) + self.run_after(Duration::from_secs(0), job) } pub fn run_after(&self, dur: Duration, job: F) where F: FnOnce() + Send + 'static { let job = Job { type_: JobType::Once(Thunk::new(job)), - time: (time::precise_time_ns() as i64 + dur.num_nanoseconds().unwrap()) as u64, + time: Instant::now() + dur, }; self.shared.run(job) } @@ -121,7 +120,7 @@ impl ScheduledThreadPool { pub fn run_at_fixed_rate(&self, rate: Duration, f: F) where F: FnMut() + Send + 'static { let job = Job { type_: JobType::FixedRate { f: Box::new(f), rate: rate }, - time: (time::precise_time_ns() as i64 + rate.num_nanoseconds().unwrap()) as u64, + time: Instant::now() + rate, }; self.shared.run(job) } @@ -174,24 +173,18 @@ impl Worker { let mut inner = self.shared.inner.lock().unwrap(); loop { - let now = time::precise_time_ns(); + let now = Instant::now(); let need = match inner.queue.peek() { None if inner.shutdown => return None, None => Need::Wait, Some(e) if e.time <= now => break, - Some(e) => Need::WaitTimeout(Duration::nanoseconds(e.time as i64 - now as i64)), + Some(e) => Need::WaitTimeout(e.time.duration_from_earlier(now)), }; inner = match need { Need::Wait => self.shared.cvar.wait(inner).unwrap(), - Need::WaitTimeout(t) => { - let mut timeout = t.num_milliseconds(); - if timeout < 0 { - timeout = 0; - } - self.shared.cvar.wait_timeout_ms(inner, timeout as u32).unwrap().0 - }, + Need::WaitTimeout(t) => self.shared.cvar.wait_timeout(inner, t).unwrap().0, }; } @@ -205,7 +198,7 @@ impl Worker { f(); let new_job = Job { type_: JobType::FixedRate { f: f, rate: rate }, - time: (job.time as i64 + rate.num_nanoseconds().unwrap()) as u64, + time: job.time + rate, }; self.shared.run(new_job) } @@ -217,7 +210,7 @@ impl Worker { mod test { use std::sync::mpsc::channel; use std::sync::{Arc, Barrier}; - use time::Duration; + use std::time::Duration; use super::ScheduledThreadPool; @@ -279,8 +272,8 @@ mod test { let (tx, rx) = channel(); let tx1 = tx.clone(); - pool.run_after(Duration::seconds(1), move || tx1.send(1usize).unwrap()); - pool.run_after(Duration::milliseconds(500), move || tx.send(2usize).unwrap()); + pool.run_after(Duration::from_secs(1), move || tx1.send(1usize).unwrap()); + pool.run_after(Duration::from_millis(500), move || tx.send(2usize).unwrap()); assert_eq!(2, rx.recv().unwrap()); assert_eq!(1, rx.recv().unwrap()); @@ -292,8 +285,8 @@ mod test { let (tx, rx) = channel(); let tx1 = tx.clone(); - pool.run_after(Duration::seconds(1), move || tx1.send(1usize).unwrap()); - pool.run_after(Duration::milliseconds(500), move || tx.send(2usize).unwrap()); + pool.run_after(Duration::from_secs(1), move || tx1.send(1usize).unwrap()); + pool.run_after(Duration::from_millis(500), move || tx.send(2usize).unwrap()); drop(pool); @@ -309,7 +302,7 @@ mod test { let mut pool2 = Some(pool.clone()); let mut i = 0i32; - pool.run_at_fixed_rate(Duration::milliseconds(500), move || { + pool.run_at_fixed_rate(Duration::from_millis(500), move || { i += 1; tx.send(i).unwrap(); rx2.recv().unwrap();