Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Noop processing strategy #376

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions arroyo/processing/strategies/noop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

from typing import Optional, Union

from arroyo.processing.strategies.abstract import ProcessingStrategy
from arroyo.types import FilteredPayload, Message, TStrategyPayload


class Noop(
ProcessingStrategy[Union[FilteredPayload, object]],
):
"""
Noop strategy that takes a message and does nothing.
"""

def __init__(self) -> None:
pass

def submit(
self, message: Message[Union[FilteredPayload, TStrategyPayload]]
) -> None:
pass

def poll(self) -> None:
pass

def join(self, timeout: Optional[float] = None) -> None:
pass

def close(self) -> None:
pass

def terminate(self) -> None:
pass
1 change: 1 addition & 0 deletions docs/source/strategies/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ Messages
run_task_with_multiprocessing
produce
commit_offsets
noop
healthcheck
5 changes: 5 additions & 0 deletions docs/source/strategies/noop.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Noop
-----------------------------

.. automodule:: arroyo.processing.strategies.noop
:members:
19 changes: 2 additions & 17 deletions rust-arroyo/examples/transform_and_produce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,16 @@ use rust_arroyo::backends::kafka::config::KafkaConfig;
use rust_arroyo::backends::kafka::producer::KafkaProducer;
use rust_arroyo::backends::kafka::types::KafkaPayload;
use rust_arroyo::backends::kafka::InitialOffset;
use rust_arroyo::processing::strategies::noop::Noop;
use rust_arroyo::processing::strategies::produce::Produce;
use rust_arroyo::processing::strategies::run_task::RunTask;
use rust_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig;
use rust_arroyo::processing::strategies::{
CommitRequest, InvalidMessage, ProcessingStrategy, ProcessingStrategyFactory, StrategyError,
SubmitError,
InvalidMessage, ProcessingStrategy, ProcessingStrategyFactory,
};
use rust_arroyo::processing::StreamProcessor;
use rust_arroyo::types::{Message, Topic, TopicOrPartition};

use std::time::Duration;

fn reverse_string(message: Message<KafkaPayload>) -> Result<Message<KafkaPayload>, InvalidMessage> {
let value = message.payload();
let payload = value.payload().unwrap();
Expand All @@ -36,19 +34,6 @@ fn reverse_string(message: Message<KafkaPayload>) -> Result<Message<KafkaPayload
);
Ok(message.replace(result))
}
struct Noop {}
impl ProcessingStrategy<KafkaPayload> for Noop {
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
fn submit(&mut self, _message: Message<KafkaPayload>) -> Result<(), SubmitError<KafkaPayload>> {
Ok(())
}
fn terminate(&mut self) {}
fn join(&mut self, _timeout: Option<Duration>) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
}

#[tokio::main]
async fn main() {
Expand Down
1 change: 1 addition & 0 deletions rust-arroyo/src/processing/strategies/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;

pub mod commit_offsets;
pub mod healthcheck;
pub mod noop;
pub mod produce;
pub mod reduce;
pub mod run_task;
Expand Down
23 changes: 23 additions & 0 deletions rust-arroyo/src/processing/strategies/noop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::time::Duration;

use crate::types::Message;

use super::{CommitRequest, ProcessingStrategy, StrategyError, SubmitError};

/// Noop strategy that takes a message and does nothing.
///
/// This can be useful when you do not care to commit an offset.
pub struct Noop {}

impl<TPayload> ProcessingStrategy<TPayload> for Noop {
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
fn submit(&mut self, _message: Message<TPayload>) -> Result<(), SubmitError<TPayload>> {
Ok(())
}
fn terminate(&mut self) {}
fn join(&mut self, _timeout: Option<Duration>) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
}
21 changes: 1 addition & 20 deletions rust-arroyo/src/processing/strategies/produce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ mod tests {
use crate::backends::local::broker::LocalBroker;
use crate::backends::local::LocalProducer;
use crate::backends::storages::memory::MemoryMessageStorage;
use crate::processing::strategies::noop::Noop;
use crate::processing::strategies::StrategyError;
use crate::types::{BrokerMessage, InnerMessage, Partition, Topic};
use crate::utils::clock::TestingClock;
Expand Down Expand Up @@ -152,26 +153,6 @@ mod tests {

let partition = Partition::new(Topic::new("test"), 0);

struct Noop {}
impl ProcessingStrategy<KafkaPayload> for Noop {
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
fn submit(
&mut self,
_message: Message<KafkaPayload>,
) -> Result<(), SubmitError<KafkaPayload>> {
Ok(())
}
fn terminate(&mut self) {}
fn join(
&mut self,
_timeout: Option<Duration>,
) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
}

let producer: KafkaProducer = KafkaProducer::new(config);
let concurrency = ConcurrencyConfig::new(10);
let mut strategy = Produce::new(
Expand Down
22 changes: 4 additions & 18 deletions rust-arroyo/src/processing/strategies/run_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ impl<TPayload, TTransformed: Send + Sync> ProcessingStrategy<TPayload>
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{BrokerMessage, InnerMessage, Message, Partition, Topic};
use crate::{
processing::strategies::noop::Noop,
types::{BrokerMessage, InnerMessage, Message, Partition, Topic},
};
use chrono::Utc;

#[test]
Expand All @@ -110,23 +113,6 @@ mod tests {
Ok(value)
}

struct Noop {}
impl ProcessingStrategy<String> for Noop {
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
fn submit(&mut self, _message: Message<String>) -> Result<(), SubmitError<String>> {
Ok(())
}
fn terminate(&mut self) {}
fn join(
&mut self,
_timeout: Option<Duration>,
) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
}

let mut strategy = RunTask::new(identity, Noop {});

let partition = Partition::new(Topic::new("test"), 0);
Expand Down
19 changes: 19 additions & 0 deletions tests/processing/strategies/test_noop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from datetime import datetime

from arroyo.processing.strategies.noop import Noop
from arroyo.types import Message, Partition, Topic, Value


def test_noop() -> None:
"""
Test that the interface of the noop strategy is correct.
"""
now = datetime.now()

strategy = Noop()
partition = Partition(Topic("topic"), 0)

strategy.submit(Message(Value(b"hello", {partition: 1}, now)))
strategy.poll()
strategy.submit(Message(Value(b"world", {partition: 2}, now)))
strategy.poll()
Loading