From 6507cfcb7dc25f891ecf2da48399c6b60ff96c08 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Thu, 26 Sep 2019 14:48:15 -0700 Subject: [PATCH] Pub/Sub to GCS using Dataflow Tutorial [(#2385)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2385) * Add README * Add requirements.txt * Add sample * Update readme * Working example * Submit for review * lint * Remove Java links * David's suggestions * nits: license header, docstring, indentation * Add a note, comments, input args, op name * change message to str, json dumps batch * update requirement * Add test, should pass lint & py36 * OK to not delete topic * use storage client * update storage client to gcs client --- samples/snippets/PubSubToGCS.py | 120 +++++++++++++++ samples/snippets/PubSubToGCS_test.py | 100 +++++++++++++ samples/snippets/README.md | 211 +++++++++++++++++++++++++++ samples/snippets/requirements.txt | 1 + 4 files changed, 432 insertions(+) create mode 100644 samples/snippets/PubSubToGCS.py create mode 100644 samples/snippets/PubSubToGCS_test.py create mode 100644 samples/snippets/README.md create mode 100644 samples/snippets/requirements.txt diff --git a/samples/snippets/PubSubToGCS.py b/samples/snippets/PubSubToGCS.py new file mode 100644 index 000000000..f003e3362 --- /dev/null +++ b/samples/snippets/PubSubToGCS.py @@ -0,0 +1,120 @@ +# Copyright 2019 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START pubsub_to_gcs] +import argparse +import datetime +import json +import logging + +import apache_beam as beam +import apache_beam.transforms.window as window +from apache_beam.options.pipeline_options import PipelineOptions + + +class GroupWindowsIntoBatches(beam.PTransform): + """A composite transform that groups Pub/Sub messages based on publish + time and outputs a list of dictionaries, where each contains one message + and its publish timestamp. + """ + + def __init__(self, window_size): + # Convert minutes into seconds. + self.window_size = int(window_size * 60) + + def expand(self, pcoll): + return (pcoll + # Assigns window info to each Pub/Sub message based on its + # publish timestamp. + | 'Window into Fixed Intervals' >> beam.WindowInto( + window.FixedWindows(self.window_size)) + | 'Add timestamps to messages' >> (beam.ParDo(AddTimestamps())) + # Use a dummy key to group the elements in the same window. + # Note that all the elements in one window must fit into memory + # for this. If the windowed elements do not fit into memory, + # please consider using `beam.util.BatchElements`. + # https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements + | 'Add Dummy Key' >> beam.Map(lambda elem: (None, elem)) + | 'Groupby' >> beam.GroupByKey() + | 'Abandon Dummy Key' >> beam.MapTuple(lambda _, val: val)) + + +class AddTimestamps(beam.DoFn): + + def process(self, element, publish_time=beam.DoFn.TimestampParam): + """Processes each incoming windowed element by extracting the Pub/Sub + message and its publish timestamp into a dictionary. `publish_time` + defaults to the publish timestamp returned by the Pub/Sub server. It + is bound to each element by Beam at runtime. + """ + + yield { + 'message_body': element.decode('utf-8'), + 'publish_time': datetime.datetime.utcfromtimestamp( + float(publish_time)).strftime("%Y-%m-%d %H:%M:%S.%f"), + } + + +class WriteBatchesToGCS(beam.DoFn): + + def __init__(self, output_path): + self.output_path = output_path + + def process(self, batch, window=beam.DoFn.WindowParam): + """Write one batch per file to a Google Cloud Storage bucket. """ + + ts_format = '%H:%M' + window_start = window.start.to_utc_datetime().strftime(ts_format) + window_end = window.end.to_utc_datetime().strftime(ts_format) + filename = '-'.join([self.output_path, window_start, window_end]) + + with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode='w') as f: + for element in batch: + f.write('{}\n'.format(json.dumps(element)).encode('utf-8')) + + +def run(input_topic, output_path, window_size=1.0, pipeline_args=None): + # `save_main_session` is set to true because some DoFn's rely on + # globally imported modules. + pipeline_options = PipelineOptions( + pipeline_args, streaming=True, save_main_session=True) + + with beam.Pipeline(options=pipeline_options) as pipeline: + (pipeline + | 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=input_topic) + | 'Window into' >> GroupWindowsIntoBatches(window_size) + | 'Write to GCS' >> beam.ParDo(WriteBatchesToGCS(output_path))) + + +if __name__ == '__main__': # noqa + logging.getLogger().setLevel(logging.INFO) + + parser = argparse.ArgumentParser() + parser.add_argument( + '--input_topic', + help='The Cloud Pub/Sub topic to read from.\n' + '"projects//topics/".') + parser.add_argument( + '--window_size', + type=float, + default=1.0, + help='Output file\'s window size in number of minutes.') + parser.add_argument( + '--output_path', + help='GCS Path of the output file including filename prefix.') + known_args, pipeline_args = parser.parse_known_args() + + run(known_args.input_topic, known_args.output_path, known_args.window_size, + pipeline_args) +# [END pubsub_to_gcs] diff --git a/samples/snippets/PubSubToGCS_test.py b/samples/snippets/PubSubToGCS_test.py new file mode 100644 index 000000000..644cf0865 --- /dev/null +++ b/samples/snippets/PubSubToGCS_test.py @@ -0,0 +1,100 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import multiprocessing as mp +import os +import pytest +import subprocess as sp +import tempfile +import time +import uuid + +import apache_beam as beam +from google.cloud import pubsub_v1 + + +PROJECT = os.environ['GCLOUD_PROJECT'] +BUCKET = os.environ['CLOUD_STORAGE_BUCKET'] +TOPIC = 'test-topic' +UUID = uuid.uuid4().hex + + +@pytest.fixture +def publisher_client(): + yield pubsub_v1.PublisherClient() + + +@pytest.fixture +def topic_path(publisher_client): + topic_path = publisher_client.topic_path(PROJECT, TOPIC) + + try: + publisher_client.delete_topic(topic_path) + except Exception: + pass + + response = publisher_client.create_topic(topic_path) + yield response.name + + +def _infinite_publish_job(publisher_client, topic_path): + while True: + future = publisher_client.publish( + topic_path, data='Hello World!'.encode('utf-8')) + future.result() + time.sleep(10) + + +def test_run(publisher_client, topic_path): + """This is an integration test that runs `PubSubToGCS.py` in its entirety. + It checks for output files on GCS. + """ + + # Use one process to publish messages to a topic. + publish_process = mp.Process( + target=lambda: _infinite_publish_job(publisher_client, topic_path)) + + # Use another process to run the streaming pipeline that should write one + # file to GCS every minute (according to the default window size). + pipeline_process = mp.Process( + target=lambda: sp.call([ + 'python', 'PubSubToGCS.py', + '--project', PROJECT, + '--runner', 'DirectRunner', + '--temp_location', tempfile.mkdtemp(), + '--input_topic', topic_path, + '--output_path', 'gs://{}/pubsub/{}/output'.format(BUCKET, UUID), + ]) + ) + + publish_process.start() + pipeline_process.start() + + # Times out the streaming pipeline after 90 seconds. + pipeline_process.join(timeout=90) + # Immediately kills the publish process after the pipeline shuts down. + publish_process.join(timeout=0) + + pipeline_process.terminate() + publish_process.terminate() + + # Check for output files on GCS. + gcs_client = beam.io.gcp.gcsio.GcsIO() + # This returns a dictionary. + files = gcs_client.list_prefix('gs://{}/pubsub/{}'.format(BUCKET, UUID)) + assert len(files) > 0 + + # Clean up. Delete topic. Delete files. + publisher_client.delete_topic(topic_path) + gcs_client.delete_batch(list(files)) diff --git a/samples/snippets/README.md b/samples/snippets/README.md new file mode 100644 index 000000000..dd24d4c5b --- /dev/null +++ b/samples/snippets/README.md @@ -0,0 +1,211 @@ +# Stream Cloud Pub/Sub with Cloud Dataflow + +Sample(s) showing how to use [Google Cloud Pub/Sub] with [Google Cloud Dataflow]. + +## Before you begin + +1. Install the [Cloud SDK]. + > *Note:* This is not required in + > [Cloud Shell] + > since it already has the Cloud SDK pre-installed. + +1. Create a new Google Cloud project via the + [*New Project* page], + or via the `gcloud` command line tool. + + ```sh + export PROJECT_NAME=your-google-cloud-project-id + gcloud projects create $PROJECT_NAME + ``` + +1. [Enable billing]. + +1. Setup the Cloud SDK to your GCP project. + + ```sh + gcloud init + ``` + +1. [Enable the APIs](https://console.cloud.google.com/flows/enableapi?apiid=dataflow,compute_component,logging,storage_component,storage_api,pubsub,cloudresourcemanager.googleapis.com,cloudscheduler.googleapis.com,appengine.googleapis.com): Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Cloud Resource Manager, and App Engine. + +1. Create a service account JSON key via the + [*Create service account key* page], + or via the `gcloud` command line tool. + Here is how to do it through the *Create service account key* page. + + * From the **Service account** list, select **New service account**. + * In the **Service account name** field, enter a name. + * From the **Role** list, select **Project > Owner** **(*)**. + * Click **Create**. A JSON file that contains your key downloads to your computer. + + Alternatively, you can use `gcloud` through the command line. + + ```sh + export PROJECT_NAME=$(gcloud config get-value project) + export SA_NAME=samples + export IAM_ACCOUNT=$SA_NAME@$PROJECT_NAME.iam.gserviceaccount.com + + # Create the service account. + gcloud iam service-accounts create $SA_NAME --display-name $SA_NAME + + # Set the role to Project Owner (*). + gcloud projects add-iam-policy-binding $PROJECT_NAME \ + --member serviceAccount:$IAM_ACCOUNT \ + --role roles/owner + + # Create a JSON file with the service account credentials. + gcloud iam service-accounts keys create path/to/your/credentials.json \ + --iam-account=$IAM_ACCOUNT + ``` + + > **(*)** *Note:* The **Role** field authorizes your service account to access resources. + > You can view and change this field later by using the + > [GCP Console IAM page]. + > If you are developing a production app, specify more granular permissions than **Project > Owner**. + > For more information, see + > [Granting roles to service accounts]. + + For more information, see + [Creating and managing service accounts]. + +1. Set your `GOOGLE_APPLICATION_CREDENTIALS` environment variable to point to your service account key file. + + ```sh + export GOOGLE_APPLICATION_CREDENTIALS=path/to/your/credentials.json + ``` + +1. Create a Cloud Storage bucket. + + ```bash + export BUCKET_NAME=your-gcs-bucket + + gsutil mb gs://$BUCKET_NAME + ``` + + 1. Start a [Google Cloud Scheduler] job that publishes one message to a [Google Cloud Pub/Sub] topic every minute. This will create an [App Engine] app if one has never been created on the project. + + ```bash + # Create a Pub/Sub topic. + gcloud pubsub topics create cron-topic + + # Create a Cloud Scheduler job + gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ + --topic=cron-topic --message-body="Hello!" + + # Run the job. + gcloud scheduler jobs run publisher-job + ``` + +## Setup + +The following instructions will help you prepare your development environment. + +1. [Install Python and virtualenv]. + +1. Clone the `python-docs-samples` repository. + + ```bash + git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git + ``` + +1. Navigate to the sample code directory. + + ```bash + cd python-docs-samples/pubsub/streaming-analytics + ``` + +1. Create a virtual environment and activate it. + + ```bash + virtualenv env + source env/bin/activate + ``` + > Once you are finished with the tutorial, you can deactivate the virtualenv and go back to your global Python environment by running `deactivate`. + +1. Install the sample requirements. + + ```bash + pip install -U -r requirements.txt + ``` + +## Streaming Analytics + +### Google Cloud Pub/Sub to Google Cloud Storage + +* [PubSubToGCS.py](PubSubToGCS.py) + +The following example will run a streaming pipeline. It will read messages from a Pub/Sub topic, then window them into fixed-sized intervals, and write one file per window into a GCS location. + ++ `--project`: sets the Google Cloud project ID to run the pipeline on ++ `--input_topic`: sets the input Pub/Sub topic to read messages from ++ `--output_path`: sets the output GCS path prefix to write files to ++ `--runner`: specifies the runner to run the pipeline, if not set to `DataflowRunner`, `DirectRunner` is used ++ `--window_size [optional]`: specifies the window size in minutes, defaults to 1.0 ++ `--temp_location`: needed for executing the pipeline + +```bash +python PubSubToGCS.py \ + --project=$PROJECT_NAME \ + --input_topic=projects/$PROJECT_NAME/topics/$TOPIC_NAME \ + --output_path=gs://$BUCKET_NAME/samples/output \ + --runner=DataflowRunner \ + --window_size=2 \ + --temp_location=gs://$BUCKET_NAME/temp +``` + +After the job has been submitted, you can check its status in the [GCP Console Dataflow page]. + +You can also check the output to your GCS bucket using the command line below or in the [GCP Console Storage page]. You may need to wait a few minutes for the files to appear. + +```bash +gsutil ls gs://$BUCKET_NAME/samples/ +``` + +## Cleanup + +1. Delete the [Google Cloud Scheduler] job. + + ```bash + gcloud scheduler jobs delete publisher-job + ``` + +1. `Ctrl+C` to stop the program in your terminal. Note that this does not actually stop the job if you use `DataflowRunner`. + +1. If you use `DirectRunner`, you can skip this step. Stop the Dataflow job in [GCP Console Dataflow page]. Cancel the job instead of draining it. This may take some minutes. + +1. Delete the topic. [Google Cloud Dataflow] will automatically delete the subscription associated with the streaming pipeline when the job is canceled. + + ```bash + gcloud pubsub topics delete cron-topic + ``` + +1. Lastly, to avoid incurring charges to your GCP account for the resources created in this tutorial: + + ```bash + # Delete only the files created by this sample. + gsutil -m rm -rf "gs://$BUCKET_NAME/samples/output*" + + # [optional] Remove the Cloud Storage bucket. + gsutil rb gs://$BUCKET_NAME + ``` + +[Apache Beam]: https://beam.apache.org/ +[Google Cloud Pub/Sub]: https://cloud.google.com/pubsub/docs/ +[Google Cloud Dataflow]: https://cloud.google.com/dataflow/docs/ +[Google Cloud Scheduler]: https://cloud.google.com/scheduler/docs/ +[App Engine]: https://cloud.google.com/appengine/docs/ + +[Cloud SDK]: https://cloud.google.com/sdk/docs/ +[Cloud Shell]: https://console.cloud.google.com/cloudshell/editor/ +[*New Project* page]: https://console.cloud.google.com/projectcreate +[Enable billing]: https://cloud.google.com/billing/docs/how-to/modify-project/ +[*Create service account key* page]: https://console.cloud.google.com/apis/credentials/serviceaccountkey/ +[GCP Console IAM page]: https://console.cloud.google.com/iam-admin/iam/ +[Granting roles to service accounts]: https://cloud.google.com/iam/docs/granting-roles-to-service-accounts/ +[Creating and managing service accounts]: https://cloud.google.com/iam/docs/creating-managing-service-accounts/ + +[Install Python and virtualenv]: https://cloud.google.com/python/setup/ + +[GCP Console create Dataflow job page]: https://console.cloud.google.com/dataflow/createjob/ +[GCP Console Dataflow page]: https://console.cloud.google.com/dataflow/ +[GCP Console Storage page]: https://console.cloud.google.com/storage/ diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt new file mode 100644 index 000000000..d1ce1adeb --- /dev/null +++ b/samples/snippets/requirements.txt @@ -0,0 +1 @@ +apache-beam[gcp]==2.15.0