Skip to content

Commit

Permalink
Pub/Sub to GCS using Dataflow Tutorial [(#2385)](GoogleCloudPlatform/…
Browse files Browse the repository at this point in the history
…python-docs-samples#2385)

* Add README

* Add requirements.txt

* Add sample

* Update readme

* Working example

* Submit for review

* lint

* Remove Java links

* David's suggestions

* nits: license header, docstring, indentation

* Add a note, comments, input args, op name

* change message to str, json dumps batch

* update requirement

* Add test, should pass lint & py36

* OK to not delete topic

* use storage client

* update storage client to gcs client
  • Loading branch information
anguillanneuf authored Sep 26, 2019
0 parents commit 6507cfc
Show file tree
Hide file tree
Showing 4 changed files with 432 additions and 0 deletions.
120 changes: 120 additions & 0 deletions samples/snippets/PubSubToGCS.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Copyright 2019 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START pubsub_to_gcs]
import argparse
import datetime
import json
import logging

import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions


class GroupWindowsIntoBatches(beam.PTransform):
"""A composite transform that groups Pub/Sub messages based on publish
time and outputs a list of dictionaries, where each contains one message
and its publish timestamp.
"""

def __init__(self, window_size):
# Convert minutes into seconds.
self.window_size = int(window_size * 60)

def expand(self, pcoll):
return (pcoll
# Assigns window info to each Pub/Sub message based on its
# publish timestamp.
| 'Window into Fixed Intervals' >> beam.WindowInto(
window.FixedWindows(self.window_size))
| 'Add timestamps to messages' >> (beam.ParDo(AddTimestamps()))
# Use a dummy key to group the elements in the same window.
# Note that all the elements in one window must fit into memory
# for this. If the windowed elements do not fit into memory,
# please consider using `beam.util.BatchElements`.
# https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
| 'Add Dummy Key' >> beam.Map(lambda elem: (None, elem))
| 'Groupby' >> beam.GroupByKey()
| 'Abandon Dummy Key' >> beam.MapTuple(lambda _, val: val))


class AddTimestamps(beam.DoFn):

def process(self, element, publish_time=beam.DoFn.TimestampParam):
"""Processes each incoming windowed element by extracting the Pub/Sub
message and its publish timestamp into a dictionary. `publish_time`
defaults to the publish timestamp returned by the Pub/Sub server. It
is bound to each element by Beam at runtime.
"""

yield {
'message_body': element.decode('utf-8'),
'publish_time': datetime.datetime.utcfromtimestamp(
float(publish_time)).strftime("%Y-%m-%d %H:%M:%S.%f"),
}


class WriteBatchesToGCS(beam.DoFn):

def __init__(self, output_path):
self.output_path = output_path

def process(self, batch, window=beam.DoFn.WindowParam):
"""Write one batch per file to a Google Cloud Storage bucket. """

ts_format = '%H:%M'
window_start = window.start.to_utc_datetime().strftime(ts_format)
window_end = window.end.to_utc_datetime().strftime(ts_format)
filename = '-'.join([self.output_path, window_start, window_end])

with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode='w') as f:
for element in batch:
f.write('{}\n'.format(json.dumps(element)).encode('utf-8'))


def run(input_topic, output_path, window_size=1.0, pipeline_args=None):
# `save_main_session` is set to true because some DoFn's rely on
# globally imported modules.
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True)

with beam.Pipeline(options=pipeline_options) as pipeline:
(pipeline
| 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=input_topic)
| 'Window into' >> GroupWindowsIntoBatches(window_size)
| 'Write to GCS' >> beam.ParDo(WriteBatchesToGCS(output_path)))


if __name__ == '__main__': # noqa
logging.getLogger().setLevel(logging.INFO)

parser = argparse.ArgumentParser()
parser.add_argument(
'--input_topic',
help='The Cloud Pub/Sub topic to read from.\n'
'"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".')
parser.add_argument(
'--window_size',
type=float,
default=1.0,
help='Output file\'s window size in number of minutes.')
parser.add_argument(
'--output_path',
help='GCS Path of the output file including filename prefix.')
known_args, pipeline_args = parser.parse_known_args()

run(known_args.input_topic, known_args.output_path, known_args.window_size,
pipeline_args)
# [END pubsub_to_gcs]
100 changes: 100 additions & 0 deletions samples/snippets/PubSubToGCS_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import multiprocessing as mp
import os
import pytest
import subprocess as sp
import tempfile
import time
import uuid

import apache_beam as beam
from google.cloud import pubsub_v1


PROJECT = os.environ['GCLOUD_PROJECT']
BUCKET = os.environ['CLOUD_STORAGE_BUCKET']
TOPIC = 'test-topic'
UUID = uuid.uuid4().hex


@pytest.fixture
def publisher_client():
yield pubsub_v1.PublisherClient()


@pytest.fixture
def topic_path(publisher_client):
topic_path = publisher_client.topic_path(PROJECT, TOPIC)

try:
publisher_client.delete_topic(topic_path)
except Exception:
pass

response = publisher_client.create_topic(topic_path)
yield response.name


def _infinite_publish_job(publisher_client, topic_path):
while True:
future = publisher_client.publish(
topic_path, data='Hello World!'.encode('utf-8'))
future.result()
time.sleep(10)


def test_run(publisher_client, topic_path):
"""This is an integration test that runs `PubSubToGCS.py` in its entirety.
It checks for output files on GCS.
"""

# Use one process to publish messages to a topic.
publish_process = mp.Process(
target=lambda: _infinite_publish_job(publisher_client, topic_path))

# Use another process to run the streaming pipeline that should write one
# file to GCS every minute (according to the default window size).
pipeline_process = mp.Process(
target=lambda: sp.call([
'python', 'PubSubToGCS.py',
'--project', PROJECT,
'--runner', 'DirectRunner',
'--temp_location', tempfile.mkdtemp(),
'--input_topic', topic_path,
'--output_path', 'gs://{}/pubsub/{}/output'.format(BUCKET, UUID),
])
)

publish_process.start()
pipeline_process.start()

# Times out the streaming pipeline after 90 seconds.
pipeline_process.join(timeout=90)
# Immediately kills the publish process after the pipeline shuts down.
publish_process.join(timeout=0)

pipeline_process.terminate()
publish_process.terminate()

# Check for output files on GCS.
gcs_client = beam.io.gcp.gcsio.GcsIO()
# This returns a dictionary.
files = gcs_client.list_prefix('gs://{}/pubsub/{}'.format(BUCKET, UUID))
assert len(files) > 0

# Clean up. Delete topic. Delete files.
publisher_client.delete_topic(topic_path)
gcs_client.delete_batch(list(files))
Loading

0 comments on commit 6507cfc

Please sign in to comment.