From ffa16d879e131ba26bb55b48835c070f1d599460 Mon Sep 17 00:00:00 2001 From: Veronica Wasson <3992422+VeronicaWasson@users.noreply.github.com> Date: Tue, 20 Aug 2024 13:03:59 -0700 Subject: [PATCH] docs(samples): Add Kafka to Dataflow snippet (#12013) * docs(samples): Add Dataflow snippet to read from Kafka * Address review feedback * Fix linter error --- dataflow/snippets/Dockerfile | 40 +++++++++ dataflow/snippets/batch_write_storage.py | 2 +- dataflow/snippets/read_kafka.py | 69 ++++++++++++++ dataflow/snippets/requirements-test.txt | 2 + dataflow/snippets/requirements.txt | 1 + .../tests/test_batch_write_storage.py | 4 + dataflow/snippets/tests/test_read_kafka.py | 89 +++++++++++++++++++ 7 files changed, 206 insertions(+), 1 deletion(-) create mode 100644 dataflow/snippets/Dockerfile create mode 100644 dataflow/snippets/read_kafka.py create mode 100644 dataflow/snippets/tests/test_read_kafka.py diff --git a/dataflow/snippets/Dockerfile b/dataflow/snippets/Dockerfile new file mode 100644 index 000000000000..ef2f9afdb8fe --- /dev/null +++ b/dataflow/snippets/Dockerfile @@ -0,0 +1,40 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# NOTE: The KafkaIO connector for Python requires the JRE to be installed +# in the execution environment. This Dockerfile enables the +# "dataflow_kafka_read" snippet to be tested without installing the JRE +# on the host machine. This Dockerfile is derived from the +# dataflow/custom-containers/ubuntu sample. + +FROM ubuntu:focal + +WORKDIR /pipeline + +COPY --from=apache/beam_python3.11_sdk:2.57.0 /opt/apache/beam /opt/apache/beam +ENTRYPOINT [ "/opt/apache/beam/boot" ] + +COPY requirements.txt . +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + curl python3-distutils default-jre docker.io \ + && rm -rf /var/lib/apt/lists/* \ + && update-alternatives --install /usr/bin/python python /usr/bin/python3 10 \ + && curl https://bootstrap.pypa.io/get-pip.py | python \ + # Install the requirements. + && pip install --no-cache-dir -r requirements.txt \ + && pip check + + +COPY read_kafka.py ./ diff --git a/dataflow/snippets/batch_write_storage.py b/dataflow/snippets/batch_write_storage.py index b86a1923df8c..69b5ab9b5475 100644 --- a/dataflow/snippets/batch_write_storage.py +++ b/dataflow/snippets/batch_write_storage.py @@ -35,7 +35,7 @@ def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None: wordsList = ["1", "2", "3", "4"] options = MyOptions() - with beam.Pipeline(options=options) as pipeline: + with beam.Pipeline(options=options.view_as(PipelineOptions)) as pipeline: ( pipeline | "Create elements" >> beam.Create(wordsList) diff --git a/dataflow/snippets/read_kafka.py b/dataflow/snippets/read_kafka.py new file mode 100644 index 000000000000..58466ebb97dc --- /dev/null +++ b/dataflow/snippets/read_kafka.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START dataflow_kafka_read] +import argparse + +import apache_beam as beam + +from apache_beam import window +from apache_beam.io.kafka import ReadFromKafka +from apache_beam.io.textio import WriteToText +from apache_beam.options.pipeline_options import PipelineOptions + + +def read_from_kafka() -> None: + + # Parse the pipeline options passed into the application. Example: + # --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER + # --output=$CLOUD_STORAGE_BUCKET --streaming + # For more information, see + # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options + class MyOptions(PipelineOptions): + @staticmethod + def _add_argparse_args(parser: argparse.ArgumentParser) -> None: + parser.add_argument('--topic') + parser.add_argument('--bootstrap_server') + parser.add_argument('--output') + + options = MyOptions() + with beam.Pipeline(options=options) as pipeline: + ( + pipeline + # Read messages from an Apache Kafka topic. + | ReadFromKafka( + consumer_config={ + "bootstrap.servers": options.bootstrap_server + }, + topics=[options.topic], + with_metadata=False, + max_num_records=5, + start_read_time=0 + ) + # The previous step creates a key-value collection, keyed by message ID. + # The values are the message payloads. + | beam.Values() + # Subdivide the output into fixed 5-second windows. + | beam.WindowInto(window.FixedWindows(5)) + | WriteToText( + file_path_prefix=options.output, + file_name_suffix='.txt', + num_shards=1) + ) +# [END dataflow_kafka_read] + + +if __name__ == "__main__": + read_from_kafka() diff --git a/dataflow/snippets/requirements-test.txt b/dataflow/snippets/requirements-test.txt index 15d066af3197..f7b11f32fc53 100644 --- a/dataflow/snippets/requirements-test.txt +++ b/dataflow/snippets/requirements-test.txt @@ -1 +1,3 @@ pytest==8.2.0 +docker==7.1.0 + diff --git a/dataflow/snippets/requirements.txt b/dataflow/snippets/requirements.txt index 5f8b2a4d0adc..ad880b7f8e69 100644 --- a/dataflow/snippets/requirements.txt +++ b/dataflow/snippets/requirements.txt @@ -1 +1,2 @@ apache-beam[gcp]==2.50.0 +kafka-python==2.0.2 diff --git a/dataflow/snippets/tests/test_batch_write_storage.py b/dataflow/snippets/tests/test_batch_write_storage.py index 9959a7545d5a..f4d6d14cb420 100644 --- a/dataflow/snippets/tests/test_batch_write_storage.py +++ b/dataflow/snippets/tests/test_batch_write_storage.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import gc import sys import uuid @@ -32,6 +33,9 @@ def setup_and_teardown() -> None: yield finally: bucket.delete(force=True) + # Ensure that PipelineOptions subclasses have been cleaned up between tests + # See https://github.com/apache/beam/issues/18197 + gc.collect() def test_write_to_cloud_storage(setup_and_teardown: None) -> None: diff --git a/dataflow/snippets/tests/test_read_kafka.py b/dataflow/snippets/tests/test_read_kafka.py new file mode 100644 index 000000000000..0dd519a91f44 --- /dev/null +++ b/dataflow/snippets/tests/test_read_kafka.py @@ -0,0 +1,89 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +import time +import uuid + +import docker + +from kafka import KafkaProducer +from kafka.admin import KafkaAdminClient, NewTopic +from kafka.errors import NoBrokersAvailable + +import pytest + + +BOOTSTRAP_SERVER = 'localhost:9092' +TOPIC_NAME = f'topic-{uuid.uuid4()}' +CONTAINER_IMAGE_NAME = 'kafka-pipeline:1' + + +@pytest.fixture(scope='module', autouse=True) +def kafka_container() -> None: + # Start a containerized Kafka server. + docker_client = docker.from_env() + container = docker_client.containers.run('apache/kafka:3.7.0', network_mode='host', detach=True) + try: + create_topic() + yield + finally: + container.stop() + + +def create_topic() -> None: + # Try to create a Kafka topic. We might need to wait for the Kafka service to start. + for _ in range(1, 10): + try: + client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVER) + topics = [] + topics.append(NewTopic(name=TOPIC_NAME, num_partitions=1, replication_factor=1)) + client.create_topics(topics) + break + except NoBrokersAvailable: + time.sleep(5) + + +def test_read_from_kafka(tmp_path: Path) -> None: + + file_name_prefix = f'output-{uuid.uuid4()}' + file_name = f'{tmp_path}/{file_name_prefix}-00000-of-00001.txt' + + # Send some messages to Kafka + producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVER) + for i in range(0, 5): + message = f'event-{i}' + producer.send(TOPIC_NAME, message.encode()) + + # Build a container image for the pipeline. + client = docker.from_env() + client.images.build(path='./', tag=CONTAINER_IMAGE_NAME) + + # Run the pipeline. + client.containers.run( + image=CONTAINER_IMAGE_NAME, + command=f'/pipeline/read_kafka.py --output /out/{file_name_prefix} --bootstrap_server {BOOTSTRAP_SERVER} --topic {TOPIC_NAME}', + volumes=['/var/run/docker.sock:/var/run/docker.sock', f'{tmp_path}/:/out'], + network_mode='host', + entrypoint='python') + + # Verify the pipeline wrote the Kafka messages to the output file. + with open(file_name, 'r') as f: + text = f.read() + for i in range(0, 5): + assert f'event-{i}' in text + + +if __name__ == "__main__": + test_read_from_kafka()