From 05ab5692d6cd516846cbfaa9d332eb63cc951372 Mon Sep 17 00:00:00 2001 From: mukund-ananthu Date: Wed, 25 Sep 2024 07:29:05 +0000 Subject: [PATCH] docs: Add OpenTelemetry publish sample --- samples/snippets/publisher.py | 88 +++++++++++++++++++++++++++++++ samples/snippets/requirements.txt | 3 ++ 2 files changed, 91 insertions(+) diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index 73afc8c97..d2be927b8 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -170,6 +170,83 @@ def delete_topic(project_id: str, topic_id: str) -> None: # [END pubsub_delete_topic] +def pubsub_publish_otel_tracing( + topic_project_id: str, trace_project_id: str, topic_id: str +) -> None: + """ + Publish to `topic_id` in `topic_project_id` with OpenTelemetry enabled. + Export the OpenTelemetry traces to Google Cloud Trace in project + `trace_project_id` + + Args: + topic_project_id: project ID of the topic to publish to. + trace_project_id: project ID to export Cloud Trace to. + topic_id: topic ID to publish to. + + Returns: + None + """ + # [START pubsub_publish_otel_tracing] + + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import ( + BatchSpanProcessor, + ) + from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter + from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased + + from google.cloud.pubsub_v1 import PublisherClient + from google.cloud.pubsub_v1.types import PublisherOptions + + # TODO(developer) + # topic_project_id = "your-topic-project-id" + # trace_project_id = "your-trace-project-id" + # topic_id = "your-topic-id" + + # In this sample, we use a Google Cloud Trace to export the OpenTelemetry + # traces: https://cloud.google.com/trace/docs/setup/python-ot + # Choose and configure the exporter for your set up accordingly. + + sampler = ParentBased(root=TraceIdRatioBased(1)) + trace.set_tracer_provider(TracerProvider(sampler=sampler)) + + # Export to Google Trace. + cloud_trace_exporter = CloudTraceSpanExporter( + project_id=trace_project_id, + ) + trace.get_tracer_provider().add_span_processor( + BatchSpanProcessor(cloud_trace_exporter) + ) + + # Set the `enable_open_telemetry_tracing` option to True when creating + # the publisher client. This in itself is necessary and sufficient for + # the library to export OpenTelemetry traces. However, where the traces + # must be exported to needs to be configured based on your OpenTelemetry + # set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/ + publisher = PublisherClient( + publisher_options=PublisherOptions( + enable_open_telemetry_tracing=True, + ), + ) + + # The `topic_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/topics/{topic_id}` + topic_path = publisher.topic_path(topic_project_id, topic_id) + # Publish messages. + for n in range(1, 10): + data_str = f"Message number {n}" + # Data must be a bytestring + data = data_str.encode("utf-8") + # When you publish a message, the client returns a future. + future = publisher.publish(topic_path, data) + print(future.result()) + + print(f"Published messages to {topic_path}.") + + # [END pubsub_publish_otel_tracing] + + def publish_messages(project_id: str, topic_id: str) -> None: """Publishes multiple messages to a Pub/Sub topic.""" # [START pubsub_quickstart_publisher] @@ -522,6 +599,13 @@ def detach_subscription(project_id: str, subscription_id: str) -> None: create_parser = subparsers.add_parser("create", help=create_topic.__doc__) create_parser.add_argument("topic_id") + pubsub_publish_otel_tracing_parser = subparsers.add_parser( + "pubsub-publish-otel-tracing", help=pubsub_publish_otel_tracing.__doc__ + ) + pubsub_publish_otel_tracing_parser.add_argument("topic_project_id") + pubsub_publish_otel_tracing_parser.add_argument("trace_project_id") + pubsub_publish_otel_tracing_parser.add_argument("topic_id") + create_topic_with_kinesis_ingestion_parser = subparsers.add_parser( "create_kinesis_ingestion", help=create_topic_with_kinesis_ingestion.__doc__ ) @@ -638,3 +722,7 @@ def detach_subscription(project_id: str, subscription_id: str) -> None: resume_publish_with_ordering_keys(args.project_id, args.topic_id) elif args.command == "detach-subscription": detach_subscription(args.project_id, args.subscription_id) + elif args.command == "pubsub-publish-otel-tracing": + pubsub_publish_otel_tracing( + args.topic_project_id, args.trace_project_id, args.topic_id + ) diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt index b2dfe2d92..f410f8f62 100644 --- a/samples/snippets/requirements.txt +++ b/samples/snippets/requirements.txt @@ -3,3 +3,6 @@ avro==1.12.0 protobuf===4.24.4; python_version == '3.7' protobuf==5.28.0; python_version >= '3.8' avro==1.12.0 +opentelemetry-api==1.22.0 +opentelemetry-sdk==1.22.0 +opentelemetry-exporter-gcp-trace==1.7.0 \ No newline at end of file