Skip to content

Commit

Permalink
chore(ci): Clippy fixes, bump rdkafka (#397)
Browse files Browse the repository at this point in the history
* chore(ci): Clippy fixes

* bump rdkafka
  • Loading branch information
untitaker authored Dec 4, 2024
1 parent 6712435 commit 67a6367
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 32 deletions.
5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ chrono = "0.4.26"
coarsetime = "0.1.33"
once_cell = "1.18.0"
rand = "0.8.5"
rdkafka = { version = "0.36.1", features = ["cmake-build", "tracing"] }
rdkafka = { version = "0.37.0", features = ["cmake-build", "tracing"] }
sentry = { version = "0.32.0" }
serde = { version = "1.0.137", features = ["derive"] }
serde_json = "1.0.81"
Expand All @@ -26,9 +26,6 @@ parking_lot = "0.12.1"
[dev-dependencies]
tracing-subscriber = "0.3.18"

[patch.crates-io]
rdkafka = { git = "https://github.com/fede1024/rust-rdkafka" }

[[example]]
name = "base_processor"
path = "rust-arroyo/examples/base_processor.rs"
Expand Down
9 changes: 3 additions & 6 deletions rust-arroyo/src/backends/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::ConsumerError;
use crate::backends::kafka::types::KafkaPayload;
use crate::gauge;
use crate::types::{BrokerMessage, Partition, Topic};
use chrono::{DateTime, NaiveDateTime, Utc};
use chrono::{DateTime, Utc};
use parking_lot::Mutex;
use rdkafka::bindings::rd_kafka_memberid;
use rdkafka::client::ClientContext;
Expand Down Expand Up @@ -102,10 +102,7 @@ fn create_kafka_message(topics: &[Topic], msg: BorrowedMessage) -> BrokerMessage
),
partition,
msg.offset() as u64,
DateTime::from_naive_utc_and_offset(
NaiveDateTime::from_timestamp_millis(time_millis).unwrap_or(NaiveDateTime::MIN),
Utc,
),
DateTime::from_timestamp_millis(time_millis).unwrap_or(DateTime::<Utc>::MIN_UTC),
)
}

Expand All @@ -130,7 +127,7 @@ struct OffsetCommitter<'a, C: AssignmentCallbacks> {
consumer: &'a BaseConsumer<CustomContext<C>>,
}

impl<'a, C: AssignmentCallbacks> CommitOffsets for OffsetCommitter<'a, C> {
impl<C: AssignmentCallbacks> CommitOffsets for OffsetCommitter<'_, C> {
fn commit(self, offsets: HashMap<Partition, u64>) -> Result<(), ConsumerError> {
commit_impl(self.consumer, offsets)
}
Expand Down
2 changes: 1 addition & 1 deletion rust-arroyo/src/backends/kafka/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl<'a> KafkaPayload {
pub fn to_base_record(
&'a self,
destination: &'a TopicOrPartition,
) -> BaseRecord<'_, Vec<u8>, Vec<u8>> {
) -> BaseRecord<'a, Vec<u8>, Vec<u8>> {
let topic = match destination {
TopicOrPartition::Topic(topic) => topic.as_str(),
TopicOrPartition::Partition(partition) => partition.topic.as_str(),
Expand Down
2 changes: 1 addition & 1 deletion rust-arroyo/src/backends/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct OffsetCommitter<'a, TPayload> {
broker: &'a mut LocalBroker<TPayload>,
}

impl<'a, TPayload> CommitOffsets for OffsetCommitter<'a, TPayload> {
impl<TPayload> CommitOffsets for OffsetCommitter<'_, TPayload> {
fn commit(self, offsets: HashMap<Partition, u64>) -> Result<(), ConsumerError> {
self.broker.commit(self.group, offsets);
Ok(())
Expand Down
14 changes: 8 additions & 6 deletions rust-arroyo/src/backends/storages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,16 @@ pub trait MessageStorage<TPayload>: Send {
/// the tail of the partition, this method returns `Ok(None)`.
///
/// # Errors
///
/// * If the offset is out of range (there are no messages, and we're not
/// reading from the tail of the partition where the next message would
/// be if it existed), [`OffsetOutOfRange`] will be returned.
/// reading from the tail of the partition where the next message would
/// be if it existed), [`OffsetOutOfRange`] will be returned.
///
/// * If the topic does not exist, [`TopicDoesNotExist`] will
/// be returned.
/// be returned.
///
/// * If the topic exists but the partition does not,
/// [`PartitionDoesNotExist`] will be returned.
/// [`PartitionDoesNotExist`] will be returned.
fn consume(
&self,
partition: &Partition,
Expand All @@ -67,11 +68,12 @@ pub trait MessageStorage<TPayload>: Send {
/// Produce a single message to the provided partition.
///
/// # Errors
///
/// * If the topic does not exist, [`TopicDoesNotExist`] will
/// be returned.
/// be returned.
///
/// * If the topic exists but the partition does not,
/// [`PartitionDoesNotExist`] will be returned.
/// [`PartitionDoesNotExist`] will be returned.
fn produce(
&mut self,
partition: &Partition,
Expand Down
21 changes: 8 additions & 13 deletions rust-arroyo/src/processing/strategies/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ use std::time::Duration;

use super::InvalidMessage;

type Acc<T, TResult> =
Arc<dyn Fn(TResult, Message<T>) -> Result<TResult, (SubmitError<T>, TResult)> + Send + Sync>;

struct BatchState<T, TResult> {
value: Option<TResult>,
accumulator: Arc<
dyn Fn(TResult, Message<T>) -> Result<TResult, (SubmitError<T>, TResult)> + Send + Sync,
>,
accumulator: Acc<T, TResult>,
offsets: BTreeMap<Partition, u64>,
batch_start_time: Deadline,
message_count: usize,
Expand All @@ -26,9 +27,7 @@ struct BatchState<T, TResult> {
impl<T, TResult> BatchState<T, TResult> {
fn new(
initial_value: TResult,
accumulator: Arc<
dyn Fn(TResult, Message<T>) -> Result<TResult, (SubmitError<T>, TResult)> + Send + Sync,
>,
accumulator: Acc<T, TResult>,
max_batch_time: Duration,
compute_batch_size: fn(&T) -> usize,
) -> BatchState<T, TResult> {
Expand All @@ -44,7 +43,7 @@ impl<T, TResult> BatchState<T, TResult> {

fn add(&mut self, message: Message<T>) -> Result<(), SubmitError<T>> {
let commitable: Vec<_> = message.committable().collect();
let message_count = (self.compute_batch_size)(&message.payload());
let message_count = (self.compute_batch_size)(message.payload());
let prev_result = self.value.take().unwrap();

match (self.accumulator)(prev_result, message) {
Expand All @@ -66,9 +65,7 @@ impl<T, TResult> BatchState<T, TResult> {

pub struct Reduce<T, TResult> {
next_step: Box<dyn ProcessingStrategy<TResult>>,
accumulator: Arc<
dyn Fn(TResult, Message<T>) -> Result<TResult, (SubmitError<T>, TResult)> + Send + Sync,
>,
accumulator: Acc<T, TResult>,
initial_value: Arc<dyn Fn() -> TResult + Send + Sync>,
max_batch_size: usize,
max_batch_time: Duration,
Expand Down Expand Up @@ -139,9 +136,7 @@ impl<T: Send + Sync, TResult: Send + Sync> ProcessingStrategy<T> for Reduce<T, T
impl<T, TResult> Reduce<T, TResult> {
pub fn new<N>(
next_step: N,
accumulator: Arc<
dyn Fn(TResult, Message<T>) -> Result<TResult, (SubmitError<T>, TResult)> + Send + Sync,
>,
accumulator: Acc<T, TResult>,
initial_value: Arc<dyn Fn() -> TResult + Send + Sync>,
max_batch_size: usize,
max_batch_time: Duration,
Expand Down
2 changes: 1 addition & 1 deletion rust-arroyo/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ enum CommittableInner<'a> {
#[derive(Debug, Clone)]
pub struct Committable<'a>(CommittableInner<'a>);

impl<'a> Iterator for Committable<'a> {
impl Iterator for Committable<'_> {
type Item = (Partition, u64);

fn next(&mut self) -> Option<Self::Item> {
Expand Down

0 comments on commit 67a6367

Please sign in to comment.