Skip to content

Commit

Permalink
feat: Add Noop processing strategy
Browse files Browse the repository at this point in the history
This can be useful when you consume a message and do not wish to commit
he offset back to Kafka [1].

[1]: The sentry uptime-checker project has this requirement as it is
     using Kafka as a mechanism to store configurations. By using log
     compaction we never actually want to commit a log offset, and want
     to read the entire log every time.
  • Loading branch information
evanpurkhiser committed Jul 1, 2024
1 parent 8b0bdee commit c0afd21
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 55 deletions.
34 changes: 34 additions & 0 deletions arroyo/processing/strategies/noop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

from typing import Generic, Optional, TypeVar, Union

from arroyo.processing.strategies.abstract import ProcessingStrategy
from arroyo.types import FilteredPayload, Message, TStrategyPayload


class Noop(
ProcessingStrategy[Union[FilteredPayload, object]],
):
"""
Noop strategy that takes a message and does nothing.
"""

def __init__(self) -> None:
pass

def submit(
self, message: Message[Union[FilteredPayload, TStrategyPayload]]
) -> None:
pass

def poll(self) -> None:
pass

def join(self, timeout: Optional[float] = None) -> None:
pass

def close(self) -> None:
pass

def terminate(self) -> None:
pass
1 change: 1 addition & 0 deletions docs/source/strategies/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ Messages
run_task_with_multiprocessing
produce
commit_offsets
noop
healthcheck
5 changes: 5 additions & 0 deletions docs/source/strategies/noop.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Noop
-----------------------------

.. automodule:: arroyo.processing.strategies.noop
:members:
19 changes: 2 additions & 17 deletions rust-arroyo/examples/transform_and_produce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,16 @@ use rust_arroyo::backends::kafka::config::KafkaConfig;
use rust_arroyo::backends::kafka::producer::KafkaProducer;
use rust_arroyo::backends::kafka::types::KafkaPayload;
use rust_arroyo::backends::kafka::InitialOffset;
use rust_arroyo::processing::strategies::noop::Noop;
use rust_arroyo::processing::strategies::produce::Produce;
use rust_arroyo::processing::strategies::run_task::RunTask;
use rust_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig;
use rust_arroyo::processing::strategies::{
CommitRequest, InvalidMessage, ProcessingStrategy, ProcessingStrategyFactory, StrategyError,
SubmitError,
InvalidMessage, ProcessingStrategy, ProcessingStrategyFactory,
};
use rust_arroyo::processing::StreamProcessor;
use rust_arroyo::types::{Message, Topic, TopicOrPartition};

use std::time::Duration;

fn reverse_string(message: Message<KafkaPayload>) -> Result<Message<KafkaPayload>, InvalidMessage> {
let value = message.payload();
let payload = value.payload().unwrap();
Expand All @@ -36,19 +34,6 @@ fn reverse_string(message: Message<KafkaPayload>) -> Result<Message<KafkaPayload
);
Ok(message.replace(result))
}
struct Noop {}
impl ProcessingStrategy<KafkaPayload> for Noop {
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
fn submit(&mut self, _message: Message<KafkaPayload>) -> Result<(), SubmitError<KafkaPayload>> {
Ok(())
}
fn terminate(&mut self) {}
fn join(&mut self, _timeout: Option<Duration>) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
}

#[tokio::main]
async fn main() {
Expand Down
1 change: 1 addition & 0 deletions rust-arroyo/src/processing/strategies/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;

pub mod commit_offsets;
pub mod healthcheck;
pub mod noop;
pub mod produce;
pub mod reduce;
pub mod run_task;
Expand Down
23 changes: 23 additions & 0 deletions rust-arroyo/src/processing/strategies/noop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::time::Duration;

use crate::types::Message;

use super::{CommitRequest, ProcessingStrategy, StrategyError, SubmitError};

/// Noop strategy that takes a message and does nothing.
///
/// This can be useful when you do not care to commit an offset.
pub struct Noop {}

impl<TPayload> ProcessingStrategy<TPayload> for Noop {
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
fn submit(&mut self, _message: Message<TPayload>) -> Result<(), SubmitError<TPayload>> {
Ok(())
}
fn terminate(&mut self) {}
fn join(&mut self, _timeout: Option<Duration>) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
}
21 changes: 1 addition & 20 deletions rust-arroyo/src/processing/strategies/produce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ mod tests {
use crate::backends::local::broker::LocalBroker;
use crate::backends::local::LocalProducer;
use crate::backends::storages::memory::MemoryMessageStorage;
use crate::processing::strategies::noop::Noop;
use crate::processing::strategies::StrategyError;
use crate::types::{BrokerMessage, InnerMessage, Partition, Topic};
use crate::utils::clock::TestingClock;
Expand Down Expand Up @@ -152,26 +153,6 @@ mod tests {

let partition = Partition::new(Topic::new("test"), 0);

struct Noop {}
impl ProcessingStrategy<KafkaPayload> for Noop {
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
fn submit(
&mut self,
_message: Message<KafkaPayload>,
) -> Result<(), SubmitError<KafkaPayload>> {
Ok(())
}
fn terminate(&mut self) {}
fn join(
&mut self,
_timeout: Option<Duration>,
) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
}

let producer: KafkaProducer = KafkaProducer::new(config);
let concurrency = ConcurrencyConfig::new(10);
let mut strategy = Produce::new(
Expand Down
22 changes: 4 additions & 18 deletions rust-arroyo/src/processing/strategies/run_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ impl<TPayload, TTransformed: Send + Sync> ProcessingStrategy<TPayload>
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{BrokerMessage, InnerMessage, Message, Partition, Topic};
use crate::{
processing::strategies::noop::Noop,
types::{BrokerMessage, InnerMessage, Message, Partition, Topic},
};
use chrono::Utc;

#[test]
Expand All @@ -110,23 +113,6 @@ mod tests {
Ok(value)
}

struct Noop {}
impl ProcessingStrategy<String> for Noop {
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
fn submit(&mut self, _message: Message<String>) -> Result<(), SubmitError<String>> {
Ok(())
}
fn terminate(&mut self) {}
fn join(
&mut self,
_timeout: Option<Duration>,
) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
}

let mut strategy = RunTask::new(identity, Noop {});

let partition = Partition::new(Topic::new("test"), 0);
Expand Down
19 changes: 19 additions & 0 deletions tests/processing/strategies/test_noop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from datetime import datetime

from arroyo.processing.strategies.noop import Noop
from arroyo.types import Message, Partition, Topic, Value


def test_noop() -> None:
"""
Test that the interface of the noop strategy is correct.
"""
now = datetime.now()

strategy = Noop()
partition = Partition(Topic("topic"), 0)

strategy.submit(Message(Value(b"hello", {partition: 1}, now)))
strategy.poll()
strategy.submit(Message(Value(b"world", {partition: 2}, now)))
strategy.poll()

0 comments on commit c0afd21

Please sign in to comment.