Skip to content

Commit

Permalink
docs(samples): Add Dataflow snippet to read from Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
VeronicaWasson committed Jul 8, 2024
1 parent a50e316 commit a6f1e0c
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 1 deletion.
2 changes: 1 addition & 1 deletion dataflow/snippets/batch_write_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
wordsList = ["1", "2", "3", "4"]
options = MyOptions()

with beam.Pipeline(options=options) as pipeline:
with beam.Pipeline(options=options.view_as(PipelineOptions)) as pipeline:
(
pipeline
| "Create elements" >> beam.Create(wordsList)
Expand Down
72 changes: 72 additions & 0 deletions dataflow/snippets/read_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env python
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START dataflow_kafka_read]
import argparse

import apache_beam as beam

from apache_beam import window
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def read_from_kafka() -> None:

# Parse the pipeline options passed into the application. Example:
# --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER
# --output=$CLOUD_STORAGE_BUCKET --streaming
# For more information, see
# https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
parser.add_argument('--topic')
parser.add_argument('--bootstrap_server')
parser.add_argument('--output')

options = MyOptions()
p = beam.Pipeline(options=options)
(
p
# Read messages from an Apache Kafka topic.
| ReadFromKafka(
consumer_config={
"bootstrap.servers": options.bootstrap_server
},
topics=[options.topic],
with_metadata=False,
max_num_records=5,
start_read_time=0
)
# The previous step creates a key-value collection, keyed by message ID.
# The values are the message payloads.
| beam.Values()
# Subdivide the output into fixed 5-second windows.
| beam.WindowInto(window.FixedWindows(5))
| WriteToText(
file_path_prefix=options.output,
file_name_suffix='.txt',
num_shards=1)
)
p.run()
# [END dataflow_kafka_read]


if __name__ == "__main__":
read_from_kafka()
2 changes: 2 additions & 0 deletions dataflow/snippets/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pytest==8.2.0
docker==7.1.0

1 change: 1 addition & 0 deletions dataflow/snippets/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
apache-beam[gcp]==2.50.0
kafka-python==2.0.2
4 changes: 4 additions & 0 deletions dataflow/snippets/tests/test_batch_write_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import sys
import uuid

Expand All @@ -32,6 +33,9 @@ def setup_and_teardown() -> None:
yield
finally:
bucket.delete(force=True)
# Ensure that PipelineOptions subclasses have been cleaned up between tests
# See https://github.com/apache/beam/issues/18197
gc.collect()


def test_write_to_cloud_storage(setup_and_teardown: None) -> None:
Expand Down
85 changes: 85 additions & 0 deletions dataflow/snippets/tests/test_read_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path
import time
from unittest.mock import patch
import uuid

import docker

from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic

import pytest

from ..read_kafka import read_from_kafka


BOOTSTRAP_SERVER = 'localhost:9092'
TOPIC_NAME = 'my-topic'


@pytest.fixture(scope='module', autouse=True)
def kafka_container() -> None:
# Start a containerized Kafka server.
docker_client = docker.from_env()
container = docker_client.containers.run('apache/kafka:3.7.0', ports={'9092/tcp': 9092}, detach=True)
try:
create_topic()
yield
finally:
container.stop()


def create_topic() -> None:
# Try to create a Kafka topic. We might need to wait for the Kafka service to start.
for _ in range(1, 10):
try:
client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVER)
topics = []
topics.append(NewTopic(name=TOPIC_NAME, num_partitions=1, replication_factor=1))
client.create_topics(topics)
break
except ValueError:
time.sleep(5)


def test_read_from_kafka(tmp_path: Path) -> None:
file_name_prefix = f'{tmp_path}/output-{uuid.uuid4()}'
file_name = f'{file_name_prefix}-00000-of-00001.txt'

# Send some messages to Kafka
producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVER)
for i in range(0, 5):
message = f'event-{i}'
producer.send(TOPIC_NAME, message.encode())

with patch("sys.argv", ["",
'--streaming',
'--allow_unsafe_triggers',
f'--topic={TOPIC_NAME}',
f'--bootstrap_server={BOOTSTRAP_SERVER}',
f'--output={file_name_prefix}']):
read_from_kafka()

# Verify the pipeline wrote events to the output file.
with open(file_name, 'r') as f:
text = f.read()
for i in range(0, 5):
assert f'event-{i}' in text


if __name__ == "__main__":
test_read_from_kafka()

0 comments on commit a6f1e0c

Please sign in to comment.