From 33caa0403c5c81ce674c369aa65b588153c261ca Mon Sep 17 00:00:00 2001 From: Evan Purkhiser Date: Mon, 1 Jul 2024 15:42:45 -0400 Subject: [PATCH] feat: Add Noop processing strategy This can be useful when you consume a message and do not wish to commit he offset back to Kafka [1]. [1]: The sentry uptime-checker project has this requirement as it is using Kafka as a mechanism to store configurations. By using log compaction we never actually want to commit a log offset, and want to read the entire log every time. --- arroyo/processing/strategies/noop.py | 34 +++++++++++++++++++ docs/source/strategies/index.rst | 1 + docs/source/strategies/noop.rst | 5 +++ rust-arroyo/examples/transform_and_produce.rs | 19 ++--------- rust-arroyo/src/processing/strategies/mod.rs | 1 + rust-arroyo/src/processing/strategies/noop.rs | 23 +++++++++++++ .../src/processing/strategies/produce.rs | 21 +----------- .../src/processing/strategies/run_task.rs | 22 +++--------- tests/processing/strategies/test_noop.py | 19 +++++++++++ 9 files changed, 90 insertions(+), 55 deletions(-) create mode 100644 arroyo/processing/strategies/noop.py create mode 100644 docs/source/strategies/noop.rst create mode 100644 rust-arroyo/src/processing/strategies/noop.rs create mode 100644 tests/processing/strategies/test_noop.py diff --git a/arroyo/processing/strategies/noop.py b/arroyo/processing/strategies/noop.py new file mode 100644 index 00000000..923b0b40 --- /dev/null +++ b/arroyo/processing/strategies/noop.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +from typing import Optional, Union + +from arroyo.processing.strategies.abstract import ProcessingStrategy +from arroyo.types import FilteredPayload, Message, TStrategyPayload + + +class Noop( + ProcessingStrategy[Union[FilteredPayload, object]], +): + """ + Noop strategy that takes a message and does nothing. + """ + + def __init__(self) -> None: + pass + + def submit( + self, message: Message[Union[FilteredPayload, TStrategyPayload]] + ) -> None: + pass + + def poll(self) -> None: + pass + + def join(self, timeout: Optional[float] = None) -> None: + pass + + def close(self) -> None: + pass + + def terminate(self) -> None: + pass diff --git a/docs/source/strategies/index.rst b/docs/source/strategies/index.rst index 1c373df0..60d75e1a 100644 --- a/docs/source/strategies/index.rst +++ b/docs/source/strategies/index.rst @@ -35,4 +35,5 @@ Messages run_task_with_multiprocessing produce commit_offsets + noop healthcheck diff --git a/docs/source/strategies/noop.rst b/docs/source/strategies/noop.rst new file mode 100644 index 00000000..5a2dc622 --- /dev/null +++ b/docs/source/strategies/noop.rst @@ -0,0 +1,5 @@ +Noop +----------------------------- + +.. automodule:: arroyo.processing.strategies.noop + :members: diff --git a/rust-arroyo/examples/transform_and_produce.rs b/rust-arroyo/examples/transform_and_produce.rs index 56970137..a44f188b 100644 --- a/rust-arroyo/examples/transform_and_produce.rs +++ b/rust-arroyo/examples/transform_and_produce.rs @@ -9,18 +9,16 @@ use rust_arroyo::backends::kafka::config::KafkaConfig; use rust_arroyo::backends::kafka::producer::KafkaProducer; use rust_arroyo::backends::kafka::types::KafkaPayload; use rust_arroyo::backends::kafka::InitialOffset; +use rust_arroyo::processing::strategies::noop::Noop; use rust_arroyo::processing::strategies::produce::Produce; use rust_arroyo::processing::strategies::run_task::RunTask; use rust_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig; use rust_arroyo::processing::strategies::{ - CommitRequest, InvalidMessage, ProcessingStrategy, ProcessingStrategyFactory, StrategyError, - SubmitError, + InvalidMessage, ProcessingStrategy, ProcessingStrategyFactory, }; use rust_arroyo::processing::StreamProcessor; use rust_arroyo::types::{Message, Topic, TopicOrPartition}; -use std::time::Duration; - fn reverse_string(message: Message) -> Result, InvalidMessage> { let value = message.payload(); let payload = value.payload().unwrap(); @@ -36,19 +34,6 @@ fn reverse_string(message: Message) -> Result for Noop { - fn poll(&mut self) -> Result, StrategyError> { - Ok(None) - } - fn submit(&mut self, _message: Message) -> Result<(), SubmitError> { - Ok(()) - } - fn terminate(&mut self) {} - fn join(&mut self, _timeout: Option) -> Result, StrategyError> { - Ok(None) - } -} #[tokio::main] async fn main() { diff --git a/rust-arroyo/src/processing/strategies/mod.rs b/rust-arroyo/src/processing/strategies/mod.rs index fbf9014b..180c1f0f 100644 --- a/rust-arroyo/src/processing/strategies/mod.rs +++ b/rust-arroyo/src/processing/strategies/mod.rs @@ -4,6 +4,7 @@ use std::time::Duration; pub mod commit_offsets; pub mod healthcheck; +pub mod noop; pub mod produce; pub mod reduce; pub mod run_task; diff --git a/rust-arroyo/src/processing/strategies/noop.rs b/rust-arroyo/src/processing/strategies/noop.rs new file mode 100644 index 00000000..143357f6 --- /dev/null +++ b/rust-arroyo/src/processing/strategies/noop.rs @@ -0,0 +1,23 @@ +use std::time::Duration; + +use crate::types::Message; + +use super::{CommitRequest, ProcessingStrategy, StrategyError, SubmitError}; + +/// Noop strategy that takes a message and does nothing. +/// +/// This can be useful when you do not care to commit an offset. +pub struct Noop {} + +impl ProcessingStrategy for Noop { + fn poll(&mut self) -> Result, StrategyError> { + Ok(None) + } + fn submit(&mut self, _message: Message) -> Result<(), SubmitError> { + Ok(()) + } + fn terminate(&mut self) {} + fn join(&mut self, _timeout: Option) -> Result, StrategyError> { + Ok(None) + } +} diff --git a/rust-arroyo/src/processing/strategies/produce.rs b/rust-arroyo/src/processing/strategies/produce.rs index f3b51a9b..a433e26e 100644 --- a/rust-arroyo/src/processing/strategies/produce.rs +++ b/rust-arroyo/src/processing/strategies/produce.rs @@ -95,6 +95,7 @@ mod tests { use crate::backends::local::broker::LocalBroker; use crate::backends::local::LocalProducer; use crate::backends::storages::memory::MemoryMessageStorage; + use crate::processing::strategies::noop::Noop; use crate::processing::strategies::StrategyError; use crate::types::{BrokerMessage, InnerMessage, Partition, Topic}; use crate::utils::clock::TestingClock; @@ -152,26 +153,6 @@ mod tests { let partition = Partition::new(Topic::new("test"), 0); - struct Noop {} - impl ProcessingStrategy for Noop { - fn poll(&mut self) -> Result, StrategyError> { - Ok(None) - } - fn submit( - &mut self, - _message: Message, - ) -> Result<(), SubmitError> { - Ok(()) - } - fn terminate(&mut self) {} - fn join( - &mut self, - _timeout: Option, - ) -> Result, StrategyError> { - Ok(None) - } - } - let producer: KafkaProducer = KafkaProducer::new(config); let concurrency = ConcurrencyConfig::new(10); let mut strategy = Produce::new( diff --git a/rust-arroyo/src/processing/strategies/run_task.rs b/rust-arroyo/src/processing/strategies/run_task.rs index 73fd04b5..59dec1ba 100644 --- a/rust-arroyo/src/processing/strategies/run_task.rs +++ b/rust-arroyo/src/processing/strategies/run_task.rs @@ -101,7 +101,10 @@ impl ProcessingStrategy #[cfg(test)] mod tests { use super::*; - use crate::types::{BrokerMessage, InnerMessage, Message, Partition, Topic}; + use crate::{ + processing::strategies::noop::Noop, + types::{BrokerMessage, InnerMessage, Message, Partition, Topic}, + }; use chrono::Utc; #[test] @@ -110,23 +113,6 @@ mod tests { Ok(value) } - struct Noop {} - impl ProcessingStrategy for Noop { - fn poll(&mut self) -> Result, StrategyError> { - Ok(None) - } - fn submit(&mut self, _message: Message) -> Result<(), SubmitError> { - Ok(()) - } - fn terminate(&mut self) {} - fn join( - &mut self, - _timeout: Option, - ) -> Result, StrategyError> { - Ok(None) - } - } - let mut strategy = RunTask::new(identity, Noop {}); let partition = Partition::new(Topic::new("test"), 0); diff --git a/tests/processing/strategies/test_noop.py b/tests/processing/strategies/test_noop.py new file mode 100644 index 00000000..c1ec4eb7 --- /dev/null +++ b/tests/processing/strategies/test_noop.py @@ -0,0 +1,19 @@ +from datetime import datetime + +from arroyo.processing.strategies.noop import Noop +from arroyo.types import Message, Partition, Topic, Value + + +def test_noop() -> None: + """ + Test that the interface of the noop strategy is correct. + """ + now = datetime.now() + + strategy = Noop() + partition = Partition(Topic("topic"), 0) + + strategy.submit(Message(Value(b"hello", {partition: 1}, now))) + strategy.poll() + strategy.submit(Message(Value(b"world", {partition: 2}, now))) + strategy.poll()