Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: Use coarsetime consistently #366

Merged
merged 3 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions rust-arroyo/examples/base_processor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
extern crate rust_arroyo;

use chrono::Duration;
use std::time::Duration;

use rust_arroyo::backends::kafka::config::KafkaConfig;
use rust_arroyo::backends::kafka::types::KafkaPayload;
use rust_arroyo::backends::kafka::InitialOffset;
Expand All @@ -12,7 +13,7 @@ use rust_arroyo::types::Topic;
struct TestFactory {}
impl ProcessingStrategyFactory<KafkaPayload> for TestFactory {
fn create(&self) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
Box::new(CommitOffsets::new(Duration::seconds(1)))
Box::new(CommitOffsets::new(Duration::from_secs(1)))
}
}

Expand Down
3 changes: 2 additions & 1 deletion rust-arroyo/src/metrics/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl Display for MetricValue {
}

macro_rules! into_metric_value {
($($from:ident),+ => $variant:ident) => {
($($from:path),+ => $variant:ident) => {
$(
impl From<$from> for MetricValue {
#[inline(always)]
Expand All @@ -82,6 +82,7 @@ into_metric_value!(i8, i16, i32, i64 => I64);
into_metric_value!(u8, u16, u32, u64 => U64);
into_metric_value!(f32, f64 => F64);
into_metric_value!(Duration => Duration);
into_metric_value!(coarsetime::Duration => Duration);

/// An alias for a list of Metric tags.
pub type MetricTags<'a> = &'a [(Option<&'a dyn Display>, &'a dyn Display)];
Expand Down
29 changes: 18 additions & 11 deletions rust-arroyo/src/processing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::panic::{self, AssertUnwindSafe};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;

use parking_lot::{Mutex, MutexGuard};
use thiserror::Error;
Expand Down Expand Up @@ -124,14 +124,17 @@ impl<TPayload: Send + Sync + 'static> AssignmentCallbacks for Callbacks<TPayload
partitions.len() as i64
);

let start = Instant::now();
let start = coarsetime::Instant::recent();

let mut state = self.0.locked_state();
state.processing_factory.update_partitions(&partitions);
state.strategy = Some(state.processing_factory.create());
state.dlq_policy.reset_dlq_limits(&partitions);

timer!("arroyo.consumer.create_strategy.time", start.elapsed());
timer!(
"arroyo.consumer.create_strategy.time",
start.elapsed_since_recent()
);
}

fn on_revoke<C: CommitOffsets>(&self, commit_offsets: C, partitions: Vec<Partition>) {
Expand All @@ -141,7 +144,7 @@ impl<TPayload: Send + Sync + 'static> AssignmentCallbacks for Callbacks<TPayload
partitions.len() as i64,
);

let start = Instant::now();
let start = coarsetime::Instant::recent();

let mut state = self.0.locked_state();
if let Some(s) = state.strategy.as_mut() {
Expand Down Expand Up @@ -174,7 +177,7 @@ impl<TPayload: Send + Sync + 'static> AssignmentCallbacks for Callbacks<TPayload
self.0.set_paused(false);
state.clear_backpressure();

timer!("arroyo.consumer.join.time", start.elapsed());
timer!("arroyo.consumer.join.time", start.elapsed_since_recent());

tracing::info!("Partition revocation complete.");

Expand All @@ -193,6 +196,7 @@ pub struct StreamProcessor<TPayload: Clone> {
processor_handle: ProcessorHandle,
buffered_messages: BufferedMessages<TPayload>,
metrics_buffer: metrics_buffer::MetricsBuffer,
_time_updater: coarsetime::Updater,
}

impl StreamProcessor<KafkaPayload> {
Expand Down Expand Up @@ -231,6 +235,7 @@ impl<TPayload: Clone + Send + Sync + 'static> StreamProcessor<TPayload> {
},
buffered_messages: BufferedMessages::new(max_buffered_messages_per_partition),
metrics_buffer: metrics_buffer::MetricsBuffer::new(),
_time_updater: coarsetime::Updater::new(10).start().unwrap(),
}
}

Expand All @@ -255,12 +260,14 @@ impl<TPayload: Clone + Send + Sync + 'static> StreamProcessor<TPayload> {
} else if self.message.is_none() {
// Otherwise, we need to try fetch a new message from the consumer,
// even if there is no active assignment and/or processing strategy.
let poll_start = Instant::now();
let poll_start = coarsetime::Instant::recent();
//TODO: Support errors properly
match self.consumer.poll(Some(Duration::from_secs(1))) {
Ok(msg) => {
self.metrics_buffer
.incr_timing("arroyo.consumer.poll.time", poll_start.elapsed());
self.metrics_buffer.incr_timing(
"arroyo.consumer.poll.time",
poll_start.elapsed_since_recent().into(),
);

if let Some(broker_msg) = msg {
self.message = Some(Message {
Expand Down Expand Up @@ -290,7 +297,7 @@ impl<TPayload: Clone + Send + Sync + 'static> StreamProcessor<TPayload> {
Some(_) => return Err(RunError::InvalidState),
}
};
let processing_start = Instant::now();
let processing_start = coarsetime::Instant::recent();

match strategy.poll() {
Ok(None) => {}
Expand Down Expand Up @@ -322,15 +329,15 @@ impl<TPayload: Clone + Send + Sync + 'static> StreamProcessor<TPayload> {
let Some(msg_s) = self.message.take() else {
self.metrics_buffer.incr_timing(
"arroyo.consumer.processing.time",
processing_start.elapsed(),
processing_start.elapsed_since_recent().into(),
);
return Ok(());
};

let ret = strategy.submit(msg_s);
self.metrics_buffer.incr_timing(
"arroyo.consumer.processing.time",
processing_start.elapsed(),
processing_start.elapsed_since_recent().into(),
);

match ret {
Expand Down
34 changes: 20 additions & 14 deletions rust-arroyo/src/processing/strategies/commit_offsets.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::time::Duration;

use chrono::{DateTime, Duration, Utc};
use chrono::Utc;

use crate::processing::strategies::{CommitRequest, ProcessingStrategy, SubmitError};
use crate::timer;
Expand All @@ -10,35 +11,38 @@ use super::StrategyError;

pub struct CommitOffsets {
partitions: HashMap<Partition, u64>,
last_commit_time: DateTime<Utc>,
last_record_time: DateTime<Utc>,
commit_frequency: Duration,
last_commit_time: coarsetime::Instant,
last_record_time: coarsetime::Instant,
commit_frequency: coarsetime::Duration,
}

impl CommitOffsets {
pub fn new(commit_frequency: Duration) -> Self {
CommitOffsets {
partitions: Default::default(),
last_commit_time: Utc::now(),
last_record_time: Utc::now(),
commit_frequency,
last_commit_time: coarsetime::Instant::recent(),
last_record_time: coarsetime::Instant::recent(),
commit_frequency: commit_frequency.into(),
}
}

fn commit(&mut self, force: bool) -> Option<CommitRequest> {
if Utc::now() - self.last_commit_time <= self.commit_frequency && !force {
// check if there is anything to commit first, since this is much cheaper than getting the
// current time
if self.partitions.is_empty() {
return None;
}

if self.partitions.is_empty() {
if coarsetime::Instant::recent() - self.last_commit_time <= self.commit_frequency && !force
{
return None;
}

let ret = Some(CommitRequest {
positions: self.partitions.clone(),
});
self.partitions.clear();
self.last_commit_time = Utc::now();
self.last_commit_time = coarsetime::Instant::recent();
ret
}
}
Expand All @@ -49,13 +53,13 @@ impl<T> ProcessingStrategy<T> for CommitOffsets {
}

fn submit(&mut self, message: Message<T>) -> Result<(), SubmitError<T>> {
let now = Utc::now();
if now - self.last_record_time > Duration::seconds(1) {
let now = coarsetime::Instant::recent();
if now - self.last_record_time > coarsetime::Duration::from_secs(1) {
if let Some(timestamp) = message.timestamp() {
// FIXME: this used to be in seconds
timer!(
"arroyo.consumer.latency",
(now - timestamp).to_std().unwrap_or_default()
(Utc::now() - timestamp).to_std().unwrap_or_default()
);
self.last_record_time = now;
}
Expand Down Expand Up @@ -93,6 +97,8 @@ mod tests {
#[test]
fn test_commit_offsets() {
tracing_subscriber::fmt().with_test_writer().init();
let updater = coarsetime::Updater::new(10).start().unwrap();

let partition1 = Partition::new(Topic::new("noop-commit"), 0);
let partition2 = Partition::new(Topic::new("noop-commit"), 1);
let timestamp = DateTime::from(SystemTime::now());
Expand All @@ -115,7 +121,7 @@ mod tests {
}),
};

let mut noop = CommitOffsets::new(chrono::Duration::seconds(1));
let mut noop = CommitOffsets::new(Duration::from_secs(1));

let mut commit_req1 = CommitRequest {
positions: Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion rust-arroyo/src/utils/timing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub struct Deadline {

#[inline(always)]
fn now() -> coarsetime::Instant {
coarsetime::Instant::now_without_cache_update()
coarsetime::Instant::recent()
}

impl Deadline {
Expand Down
Loading