diff --git a/rust-arroyo/examples/base_processor.rs b/rust-arroyo/examples/base_processor.rs index 2b652fe7..012cb70d 100644 --- a/rust-arroyo/examples/base_processor.rs +++ b/rust-arroyo/examples/base_processor.rs @@ -1,6 +1,7 @@ extern crate rust_arroyo; -use chrono::Duration; +use std::time::Duration; + use rust_arroyo::backends::kafka::config::KafkaConfig; use rust_arroyo::backends::kafka::types::KafkaPayload; use rust_arroyo::backends::kafka::InitialOffset; @@ -12,7 +13,7 @@ use rust_arroyo::types::Topic; struct TestFactory {} impl ProcessingStrategyFactory for TestFactory { fn create(&self) -> Box> { - Box::new(CommitOffsets::new(Duration::seconds(1))) + Box::new(CommitOffsets::new(Duration::from_secs(1))) } } diff --git a/rust-arroyo/src/metrics/types.rs b/rust-arroyo/src/metrics/types.rs index 3c1396bb..ea44eaed 100644 --- a/rust-arroyo/src/metrics/types.rs +++ b/rust-arroyo/src/metrics/types.rs @@ -66,7 +66,7 @@ impl Display for MetricValue { } macro_rules! into_metric_value { - ($($from:ident),+ => $variant:ident) => { + ($($from:path),+ => $variant:ident) => { $( impl From<$from> for MetricValue { #[inline(always)] @@ -82,6 +82,7 @@ into_metric_value!(i8, i16, i32, i64 => I64); into_metric_value!(u8, u16, u32, u64 => U64); into_metric_value!(f32, f64 => F64); into_metric_value!(Duration => Duration); +into_metric_value!(coarsetime::Duration => Duration); /// An alias for a list of Metric tags. pub type MetricTags<'a> = &'a [(Option<&'a dyn Display>, &'a dyn Display)]; diff --git a/rust-arroyo/src/processing/mod.rs b/rust-arroyo/src/processing/mod.rs index 329e264d..de246a3b 100644 --- a/rust-arroyo/src/processing/mod.rs +++ b/rust-arroyo/src/processing/mod.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::panic::{self, AssertUnwindSafe}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use parking_lot::{Mutex, MutexGuard}; use thiserror::Error; @@ -124,14 +124,17 @@ impl AssignmentCallbacks for Callbacks(&self, commit_offsets: C, partitions: Vec) { @@ -141,7 +144,7 @@ impl AssignmentCallbacks for Callbacks AssignmentCallbacks for Callbacks { processor_handle: ProcessorHandle, buffered_messages: BufferedMessages, metrics_buffer: metrics_buffer::MetricsBuffer, + _time_updater: coarsetime::Updater, } impl StreamProcessor { @@ -230,6 +234,7 @@ impl StreamProcessor { }, buffered_messages: BufferedMessages::new(max_buffered_messages_per_partition), metrics_buffer: metrics_buffer::MetricsBuffer::new(), + _time_updater: coarsetime::Updater::new(10).start().unwrap(), } } @@ -254,12 +259,14 @@ impl StreamProcessor { } else if self.message.is_none() { // Otherwise, we need to try fetch a new message from the consumer, // even if there is no active assignment and/or processing strategy. - let poll_start = Instant::now(); + let poll_start = coarsetime::Instant::recent(); //TODO: Support errors properly match self.consumer.poll(Some(Duration::from_secs(1))) { Ok(msg) => { - self.metrics_buffer - .incr_timing("arroyo.consumer.poll.time", poll_start.elapsed()); + self.metrics_buffer.incr_timing( + "arroyo.consumer.poll.time", + poll_start.elapsed_since_recent().into(), + ); if let Some(broker_msg) = msg { self.message = Some(Message { @@ -289,7 +296,7 @@ impl StreamProcessor { Some(_) => return Err(RunError::InvalidState), } }; - let processing_start = Instant::now(); + let processing_start = coarsetime::Instant::recent(); match strategy.poll() { Ok(None) => {} @@ -321,7 +328,7 @@ impl StreamProcessor { let Some(msg_s) = self.message.take() else { self.metrics_buffer.incr_timing( "arroyo.consumer.processing.time", - processing_start.elapsed(), + processing_start.elapsed_since_recent().into(), ); return Ok(()); }; @@ -329,7 +336,7 @@ impl StreamProcessor { let ret = strategy.submit(msg_s); self.metrics_buffer.incr_timing( "arroyo.consumer.processing.time", - processing_start.elapsed(), + processing_start.elapsed_since_recent().into(), ); match ret { diff --git a/rust-arroyo/src/processing/strategies/commit_offsets.rs b/rust-arroyo/src/processing/strategies/commit_offsets.rs index 8c3fb052..398fbfa9 100644 --- a/rust-arroyo/src/processing/strategies/commit_offsets.rs +++ b/rust-arroyo/src/processing/strategies/commit_offsets.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; +use std::time::Duration; -use chrono::{DateTime, Duration, Utc}; +use chrono::Utc; use crate::processing::strategies::{CommitRequest, ProcessingStrategy, SubmitError}; use crate::timer; @@ -10,27 +11,30 @@ use super::StrategyError; pub struct CommitOffsets { partitions: HashMap, - last_commit_time: DateTime, - last_record_time: DateTime, - commit_frequency: Duration, + last_commit_time: coarsetime::Instant, + last_record_time: coarsetime::Instant, + commit_frequency: coarsetime::Duration, } impl CommitOffsets { pub fn new(commit_frequency: Duration) -> Self { CommitOffsets { partitions: Default::default(), - last_commit_time: Utc::now(), - last_record_time: Utc::now(), - commit_frequency, + last_commit_time: coarsetime::Instant::recent(), + last_record_time: coarsetime::Instant::recent(), + commit_frequency: commit_frequency.into(), } } fn commit(&mut self, force: bool) -> Option { - if Utc::now() - self.last_commit_time <= self.commit_frequency && !force { + // check if there is anything to commit first, since this is much cheaper than getting the + // current time + if self.partitions.is_empty() { return None; } - if self.partitions.is_empty() { + if coarsetime::Instant::recent() - self.last_commit_time <= self.commit_frequency && !force + { return None; } @@ -38,7 +42,7 @@ impl CommitOffsets { positions: self.partitions.clone(), }); self.partitions.clear(); - self.last_commit_time = Utc::now(); + self.last_commit_time = coarsetime::Instant::recent(); ret } } @@ -49,13 +53,13 @@ impl ProcessingStrategy for CommitOffsets { } fn submit(&mut self, message: Message) -> Result<(), SubmitError> { - let now = Utc::now(); - if now - self.last_record_time > Duration::seconds(1) { + let now = coarsetime::Instant::recent(); + if now - self.last_record_time > coarsetime::Duration::from_secs(1) { if let Some(timestamp) = message.timestamp() { // FIXME: this used to be in seconds timer!( "arroyo.consumer.latency", - (now - timestamp).to_std().unwrap_or_default() + (Utc::now() - timestamp).to_std().unwrap_or_default() ); self.last_record_time = now; } @@ -91,6 +95,8 @@ mod tests { #[test] fn test_commit_offsets() { tracing_subscriber::fmt().with_test_writer().init(); + let updater = coarsetime::Updater::new(10).start().unwrap(); + let partition1 = Partition::new(Topic::new("noop-commit"), 0); let partition2 = Partition::new(Topic::new("noop-commit"), 1); let timestamp = DateTime::from(SystemTime::now()); @@ -113,7 +119,7 @@ mod tests { }), }; - let mut noop = CommitOffsets::new(chrono::Duration::seconds(1)); + let mut noop = CommitOffsets::new(Duration::from_secs(1)); let mut commit_req1 = CommitRequest { positions: Default::default(), diff --git a/rust-arroyo/src/utils/timing.rs b/rust-arroyo/src/utils/timing.rs index a437b23b..a0af8b08 100644 --- a/rust-arroyo/src/utils/timing.rs +++ b/rust-arroyo/src/utils/timing.rs @@ -9,7 +9,7 @@ pub struct Deadline { #[inline(always)] fn now() -> coarsetime::Instant { - coarsetime::Instant::now_without_cache_update() + coarsetime::Instant::recent() } impl Deadline {