Skip to content

Commit

Permalink
docs(samples): Add Kafka to Dataflow snippet (#12013)
Browse files Browse the repository at this point in the history
* docs(samples): Add Dataflow snippet to read from Kafka

* Address review feedback

* Fix linter error
  • Loading branch information
VeronicaWasson authored Aug 20, 2024
1 parent 5fdfd82 commit ffa16d8
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 1 deletion.
40 changes: 40 additions & 0 deletions dataflow/snippets/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# NOTE: The KafkaIO connector for Python requires the JRE to be installed
# in the execution environment. This Dockerfile enables the
# "dataflow_kafka_read" snippet to be tested without installing the JRE
# on the host machine. This Dockerfile is derived from the
# dataflow/custom-containers/ubuntu sample.

FROM ubuntu:focal

WORKDIR /pipeline

COPY --from=apache/beam_python3.11_sdk:2.57.0 /opt/apache/beam /opt/apache/beam
ENTRYPOINT [ "/opt/apache/beam/boot" ]

COPY requirements.txt .
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
curl python3-distutils default-jre docker.io \
&& rm -rf /var/lib/apt/lists/* \
&& update-alternatives --install /usr/bin/python python /usr/bin/python3 10 \
&& curl https://bootstrap.pypa.io/get-pip.py | python \
# Install the requirements.
&& pip install --no-cache-dir -r requirements.txt \
&& pip check


COPY read_kafka.py ./
2 changes: 1 addition & 1 deletion dataflow/snippets/batch_write_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
wordsList = ["1", "2", "3", "4"]
options = MyOptions()

with beam.Pipeline(options=options) as pipeline:
with beam.Pipeline(options=options.view_as(PipelineOptions)) as pipeline:
(
pipeline
| "Create elements" >> beam.Create(wordsList)
Expand Down
69 changes: 69 additions & 0 deletions dataflow/snippets/read_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#!/usr/bin/env python
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START dataflow_kafka_read]
import argparse

import apache_beam as beam

from apache_beam import window
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:

# Parse the pipeline options passed into the application. Example:
# --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER
# --output=$CLOUD_STORAGE_BUCKET --streaming
# For more information, see
# https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
class MyOptions(PipelineOptions):
@staticmethod
def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
parser.add_argument('--topic')
parser.add_argument('--bootstrap_server')
parser.add_argument('--output')

options = MyOptions()
with beam.Pipeline(options=options) as pipeline:
(
pipeline
# Read messages from an Apache Kafka topic.
| ReadFromKafka(
consumer_config={
"bootstrap.servers": options.bootstrap_server
},
topics=[options.topic],
with_metadata=False,
max_num_records=5,
start_read_time=0
)
# The previous step creates a key-value collection, keyed by message ID.
# The values are the message payloads.
| beam.Values()
# Subdivide the output into fixed 5-second windows.
| beam.WindowInto(window.FixedWindows(5))
| WriteToText(
file_path_prefix=options.output,
file_name_suffix='.txt',
num_shards=1)
)
# [END dataflow_kafka_read]


if __name__ == "__main__":
read_from_kafka()
2 changes: 2 additions & 0 deletions dataflow/snippets/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pytest==8.2.0
docker==7.1.0

1 change: 1 addition & 0 deletions dataflow/snippets/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
apache-beam[gcp]==2.50.0
kafka-python==2.0.2
4 changes: 4 additions & 0 deletions dataflow/snippets/tests/test_batch_write_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import sys
import uuid

Expand All @@ -32,6 +33,9 @@ def setup_and_teardown() -> None:
yield
finally:
bucket.delete(force=True)
# Ensure that PipelineOptions subclasses have been cleaned up between tests
# See https://github.com/apache/beam/issues/18197
gc.collect()


def test_write_to_cloud_storage(setup_and_teardown: None) -> None:
Expand Down
89 changes: 89 additions & 0 deletions dataflow/snippets/tests/test_read_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path
import time
import uuid

import docker

from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import NoBrokersAvailable

import pytest


BOOTSTRAP_SERVER = 'localhost:9092'
TOPIC_NAME = f'topic-{uuid.uuid4()}'
CONTAINER_IMAGE_NAME = 'kafka-pipeline:1'


@pytest.fixture(scope='module', autouse=True)
def kafka_container() -> None:
# Start a containerized Kafka server.
docker_client = docker.from_env()
container = docker_client.containers.run('apache/kafka:3.7.0', network_mode='host', detach=True)
try:
create_topic()
yield
finally:
container.stop()


def create_topic() -> None:
# Try to create a Kafka topic. We might need to wait for the Kafka service to start.
for _ in range(1, 10):
try:
client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVER)
topics = []
topics.append(NewTopic(name=TOPIC_NAME, num_partitions=1, replication_factor=1))
client.create_topics(topics)
break
except NoBrokersAvailable:
time.sleep(5)


def test_read_from_kafka(tmp_path: Path) -> None:

file_name_prefix = f'output-{uuid.uuid4()}'
file_name = f'{tmp_path}/{file_name_prefix}-00000-of-00001.txt'

# Send some messages to Kafka
producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVER)
for i in range(0, 5):
message = f'event-{i}'
producer.send(TOPIC_NAME, message.encode())

# Build a container image for the pipeline.
client = docker.from_env()
client.images.build(path='./', tag=CONTAINER_IMAGE_NAME)

# Run the pipeline.
client.containers.run(
image=CONTAINER_IMAGE_NAME,
command=f'/pipeline/read_kafka.py --output /out/{file_name_prefix} --bootstrap_server {BOOTSTRAP_SERVER} --topic {TOPIC_NAME}',
volumes=['/var/run/docker.sock:/var/run/docker.sock', f'{tmp_path}/:/out'],
network_mode='host',
entrypoint='python')

# Verify the pipeline wrote the Kafka messages to the output file.
with open(file_name, 'r') as f:
text = f.read()
for i in range(0, 5):
assert f'event-{i}' in text


if __name__ == "__main__":
test_read_from_kafka()

0 comments on commit ffa16d8

Please sign in to comment.