Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: patch getting-started example #318

Merged
merged 3 commits into from
Dec 27, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/source/getstarted.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,18 @@ Here we are using the `RunTask` strategy which runs a custom function over each

.. code-block:: Python

from collections.abc import Mapping
untitaker marked this conversation as resolved.
Show resolved Hide resolved

from arroyo.backends.kafka import KafkaPayload
from arroyo.processing.strategies import (
CommitOffsets,
ProcessingStrategy,
ProcessingStrategyFactory,
RunTask,
)
from arroyo.types import Commit, Message, Partition, Topic


def handle_message(message: Message[KafkaPayload]) -> Message[KafkaPayload]:
print(f"MSG: {message.payload}")
return message
Expand All @@ -174,10 +186,14 @@ The code above is orchestrated by the Arroyo runtime called `StreamProcessor`.

.. code-block:: Python

from arroyo.processing import StreamProcessor
from arroyo.commit import ONCE_PER_SECOND

processor = StreamProcessor(
consumer=consumer,
topic=TOPIC,
processor_factory=ConsumerStrategyFactory(),
commit_policy=ONCE_PER_SECOND,
)

processor.run()
Expand All @@ -192,6 +208,10 @@ Now we will chain the `Produce` strategy to produce messages on a second topic a

.. code-block:: Python

from arroyo.backends.kafka import KafkaProducer
from arroyo.backends.kafka.configuration import build_kafka_configuration
from arroyo.processing.strategies import Produce

class ConsumerStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
"""
The factory manages the lifecycle of the `ProcessingStrategy`.
Expand Down
Loading