Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add OpenTelemetry publish sample #1258

Merged
merged 1 commit into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions samples/snippets/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,83 @@ def delete_topic(project_id: str, topic_id: str) -> None:
# [END pubsub_delete_topic]


def pubsub_publish_otel_tracing(
topic_project_id: str, trace_project_id: str, topic_id: str
) -> None:
"""
Publish to `topic_id` in `topic_project_id` with OpenTelemetry enabled.
Export the OpenTelemetry traces to Google Cloud Trace in project
`trace_project_id`

Args:
topic_project_id: project ID of the topic to publish to.
trace_project_id: project ID to export Cloud Trace to.
topic_id: topic ID to publish to.

Returns:
None
"""
# [START pubsub_publish_otel_tracing]

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
)
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased

from google.cloud.pubsub_v1 import PublisherClient
from google.cloud.pubsub_v1.types import PublisherOptions

# TODO(developer)
# topic_project_id = "your-topic-project-id"
# trace_project_id = "your-trace-project-id"
# topic_id = "your-topic-id"

# In this sample, we use a Google Cloud Trace to export the OpenTelemetry
# traces: https://cloud.google.com/trace/docs/setup/python-ot
# Choose and configure the exporter for your set up accordingly.

sampler = ParentBased(root=TraceIdRatioBased(1))
trace.set_tracer_provider(TracerProvider(sampler=sampler))

# Export to Google Trace.
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=trace_project_id,
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(cloud_trace_exporter)
)

# Set the `enable_open_telemetry_tracing` option to True when creating
# the publisher client. This in itself is necessary and sufficient for
# the library to export OpenTelemetry traces. However, where the traces
# must be exported to needs to be configured based on your OpenTelemetry
# set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
publisher = PublisherClient(
publisher_options=PublisherOptions(
enable_open_telemetry_tracing=True,
),
)

# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(topic_project_id, topic_id)
# Publish messages.
for n in range(1, 10):
data_str = f"Message number {n}"
# Data must be a bytestring
data = data_str.encode("utf-8")
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data)
print(future.result())

print(f"Published messages to {topic_path}.")

# [END pubsub_publish_otel_tracing]


def publish_messages(project_id: str, topic_id: str) -> None:
"""Publishes multiple messages to a Pub/Sub topic."""
# [START pubsub_quickstart_publisher]
Expand Down Expand Up @@ -522,6 +599,13 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
create_parser = subparsers.add_parser("create", help=create_topic.__doc__)
create_parser.add_argument("topic_id")

pubsub_publish_otel_tracing_parser = subparsers.add_parser(
"pubsub-publish-otel-tracing", help=pubsub_publish_otel_tracing.__doc__
)
pubsub_publish_otel_tracing_parser.add_argument("topic_project_id")
pubsub_publish_otel_tracing_parser.add_argument("trace_project_id")
pubsub_publish_otel_tracing_parser.add_argument("topic_id")

create_topic_with_kinesis_ingestion_parser = subparsers.add_parser(
"create_kinesis_ingestion", help=create_topic_with_kinesis_ingestion.__doc__
)
Expand Down Expand Up @@ -638,3 +722,7 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
resume_publish_with_ordering_keys(args.project_id, args.topic_id)
elif args.command == "detach-subscription":
detach_subscription(args.project_id, args.subscription_id)
elif args.command == "pubsub-publish-otel-tracing":
pubsub_publish_otel_tracing(
args.topic_project_id, args.trace_project_id, args.topic_id
)
3 changes: 3 additions & 0 deletions samples/snippets/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ avro==1.12.0
protobuf===4.24.4; python_version == '3.7'
protobuf==5.28.0; python_version >= '3.8'
avro==1.12.0
opentelemetry-api==1.22.0
opentelemetry-sdk==1.22.0
opentelemetry-exporter-gcp-trace==1.7.0