Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a working propagator, adding to integrations and example #137

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from requests.sessions import Session

from opentelemetry import propagators
from opentelemetry.trace import SpanKind


Expand Down Expand Up @@ -71,6 +72,8 @@ def instrumented_request(self, method, url, *args, **kwargs):
# TODO: Propagate the trace context via headers once we have a way
# to access propagators.

headers = kwargs.setdefault("headers", {})
propagators.inject(tracer, type(headers).__setitem__, headers)
result = wrapped(self, method, url, *args, **kwargs) # *** PROCEED

span.set_attribute("http.status_code", result.status_code)
Expand Down
28 changes: 22 additions & 6 deletions ext/opentelemetry-ext-wsgi/src/opentelemetry/ext/wsgi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
"""

import functools
import typing
import wsgiref.util as wsgiref_util

from opentelemetry import trace
from opentelemetry import propagators, trace
from opentelemetry.ext.wsgi.version import __version__ # noqa


Expand All @@ -35,12 +36,9 @@ class OpenTelemetryMiddleware:
wsgi: The WSGI application callable.
"""

def __init__(self, wsgi, propagators=None):
def __init__(self, wsgi):
self.wsgi = wsgi

# TODO: implement context propagation
self.propagators = propagators

@staticmethod
def _add_request_attributes(span, environ):
span.set_attribute("component", "http")
Expand Down Expand Up @@ -87,8 +85,11 @@ def __call__(self, environ, start_response):

tracer = trace.tracer()
path_info = environ["PATH_INFO"] or "/"
parent_span = propagators.extract(get_header_from_environ, environ)

with tracer.start_span(path_info, kind=trace.SpanKind.SERVER) as span:
with tracer.start_span(
path_info, parent_span, kind=trace.SpanKind.SERVER
) as span:
self._add_request_attributes(span, environ)
start_response = self._create_start_response(span, start_response)

Expand All @@ -99,3 +100,18 @@ def __call__(self, environ, start_response):
finally:
if hasattr(iterable, "close"):
iterable.close()


def get_header_from_environ(
environ: dict, header_name: str
) -> typing.List[str]:
"""Retrieve the header value from the wsgi environ dictionary.

Returns:
A string with the header value if it exists, else None.
"""
environ_key = "HTTP_" + header_name.upper().replace("-", "_")
value = environ.get(environ_key)
if value:
return [value]
return []
2 changes: 1 addition & 1 deletion ext/opentelemetry-ext-wsgi/tests/test_wsgi_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def validate_response(self, response, error=None):

# Verify that start_span has been called
self.start_span.assert_called_once_with(
"/", kind=trace_api.SpanKind.SERVER
"/", trace_api.INVALID_SPAN_CONTEXT, kind=trace_api.SpanKind.SERVER
)

def test_basic_wsgi_call(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

from opentelemetry.trace import SpanContext

Setter = typing.Callable[[object, str, str], None]
Getter = typing.Callable[[object, str], typing.List[str]]
_T = typing.TypeVar("_T")

Setter = typing.Callable[[typing.Type[_T], str, str], None]
Getter = typing.Callable[[typing.Type[_T], str], typing.List[str]]


class HTTPTextFormat(abc.ABC):
Expand Down Expand Up @@ -70,7 +72,7 @@ def example_route():

@abc.abstractmethod
def extract(
self, get_from_carrier: Getter, carrier: object
self, get_from_carrier: Getter[_T], carrier: _T
) -> SpanContext:
"""Create a SpanContext from values in the carrier.

Expand All @@ -93,7 +95,7 @@ def extract(

@abc.abstractmethod
def inject(
self, context: SpanContext, set_in_carrier: Setter, carrier: object
self, context: SpanContext, set_in_carrier: Setter[_T], carrier: _T
) -> None:
"""Inject values from a SpanContext into a carrier.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright 2019, OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import typing

import opentelemetry.trace as trace
from opentelemetry.context.propagation import httptextformat

_T = typing.TypeVar("_T")


class TraceContextHTTPTextFormat(httptextformat.HTTPTextFormat):
"""TODO: extracts and injects using w3c TraceContext's headers.
"""

def extract(
self, _get_from_carrier: httptextformat.Getter[_T], _carrier: _T
) -> trace.SpanContext:
return trace.INVALID_SPAN_CONTEXT

def inject(
self,
context: trace.SpanContext,
set_in_carrier: httptextformat.Setter[_T],
carrier: _T,
) -> None:
pass
70 changes: 70 additions & 0 deletions opentelemetry-api/src/opentelemetry/propagators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import typing

import opentelemetry.context.propagation.httptextformat as httptextformat
import opentelemetry.trace as trace
from opentelemetry.context.propagation.tracecontexthttptextformat import (
TraceContextHTTPTextFormat,
)

_T = typing.TypeVar("_T")


def extract(
get_from_carrier: httptextformat.Getter[_T], carrier: _T
) -> trace.SpanContext:
"""Load the parent SpanContext from values in the carrier.

Using the specified HTTPTextFormatter, the propagator will
extract a SpanContext from the carrier. If one is found,
it will be set as the parent context of the current span.

Args:
get_from_carrier: a function that can retrieve zero
or more values from the carrier. In the case that
the value does not exist, return an empty list.
carrier: and object which contains values that are
used to construct a SpanContext. This object
must be paired with an appropriate get_from_carrier
which understands how to extract a value from it.
"""
return get_global_httptextformat().extract(get_from_carrier, carrier)


def inject(
tracer: trace.Tracer,
set_in_carrier: httptextformat.Setter[_T],
carrier: _T,
) -> None:
"""Inject values from the current context into the carrier.

inject enables the propagation of values into HTTP clients or
other objects which perform an HTTP request. Implementations
should use the set_in_carrier method to set values on the
carrier.

Args:
set_in_carrier: A setter function that can set values
on the carrier.
carrier: An object that contains a representation of HTTP
headers. Should be paired with set_in_carrier, which
should know how to set header values on the carrier.
"""
get_global_httptextformat().inject(
tracer.get_current_span().get_context(), set_in_carrier, carrier
)


_HTTP_TEXT_FORMAT = (
TraceContextHTTPTextFormat()
) # type: httptextformat.HTTPTextFormat


def get_global_httptextformat() -> httptextformat.HTTPTextFormat:
return _HTTP_TEXT_FORMAT


def set_global_httptextformat(
http_text_format: httptextformat.HTTPTextFormat
) -> None:
global _HTTP_TEXT_FORMAT # pylint:disable=global-statement
_HTTP_TEXT_FORMAT = http_text_format
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
This module serves as an example to integrate with flask, using
the requests library to perform downstream requests
"""
import time

import flask
import requests

import opentelemetry.ext.http_requests
from opentelemetry import trace
from opentelemetry import propagators, trace
from opentelemetry.ext.wsgi import OpenTelemetryMiddleware
from opentelemetry.sdk.context.propagation.b3_format import B3Format
from opentelemetry.sdk.trace import Tracer


Expand All @@ -39,14 +39,20 @@ def configure_opentelemetry(flask_app: flask.Flask):

* processors?
* exporters?
* propagators?
"""
# Start by configuring all objects required to ensure
# a complete end to end workflow.
# the preferred implementation of these objects must be set,
# as the opentelemetry-api defines the interface with a no-op
# implementation.
trace.set_preferred_tracer_implementation(lambda _: Tracer())
# Next, we need to configure how the values that are used by
# traces and metrics are propagated (such as what specific headers
# carry this value).

# TBD: can remove once default TraceContext propagators are installed.
propagators.set_global_httptextformat(B3Format())

# Integrations are the glue that binds the OpenTelemetry API
# and the frameworks and libraries that are used together, automatically
# creating Spans and propagating context as appropriate.
Expand All @@ -61,8 +67,8 @@ def configure_opentelemetry(flask_app: flask.Flask):
def hello():
# emit a trace that measures how long the
# sleep takes
with trace.tracer().start_span("sleep"):
time.sleep(0.001)
with trace.tracer().start_span("example-request"):
requests.get("http://www.example.com")
return "hello"


Expand Down
51 changes: 48 additions & 3 deletions opentelemetry-example-app/tests/test_flask_example.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,59 @@
import unittest
from unittest import mock

import requests
from werkzeug.test import Client
from werkzeug.wrappers import BaseResponse

import opentelemetry_example_app.flask_example as flask_example
from opentelemetry.sdk import trace
from opentelemetry.sdk.context.propagation import b3_format


class TestFlaskExample(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.app = flask_example.app

def setUp(self):
mocked_response = requests.models.Response()
mocked_response.status_code = 200
mocked_response.reason = "Roger that!"
self.send_patcher = mock.patch.object(
requests.Session,
"send",
autospec=True,
spec_set=True,
return_value=mocked_response,
)
self.send = self.send_patcher.start()

def tearDown(self):
self.send_patcher.stop()

def test_full_path(self):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is now a full-fledged propagation test. We could probably build on this once we have stuff like exporters.

@c24t I think this will work for one of the test cases we discussed, although not as comprehensive as bringing up a full server.

with self.app.test_client() as client:
response = client.get("/")
assert response.data.decode() == "hello"
trace_id = trace.generate_trace_id()
# We need to use the Werkzeug test app because
# The headers are injected at the wsgi layer.
# The flask test app will not include these, and
# result in the values not propagated.
client = Client(self.app.wsgi_app, BaseResponse)
# emulate b3 headers
client.get(
"/",
headers={
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use the B3 formatter's inject directly on the dict here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that might be a little tricky: it would require that I have direct access to the SpanContext for the app, which may not occur in situations where the app lives in a different thread or context than the test code itself.

I feel like this is a more thorough test of the defined behavior, although I definitely see the merit of not effectively redefining the b3 interface.

"x-b3-traceid": b3_format.format_trace_id(trace_id),
"x-b3-spanid": b3_format.format_span_id(
trace.generate_span_id()
),
"x-b3-sampled": "1",
},
)
# assert the http request header was propagated through.
prepared_request = self.send.call_args[0][1]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be sure: This does not test the headers that were "sent" by client.get but the headers that were sent by the example app using requests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's not an easy way to differentiate between headers that were directly set by a user vs the headers that were set in the propagator: both are setting the headers keyword that is passed in as part of the request.

Theoretically someone could modify the examples to send the same headers that the propagator is responsible for, but that's the not case today. Also the way that the integration is written, propagator headers will override any user-defined headers.

headers = prepared_request.headers
for required_header in {"x-b3-traceid", "x-b3-spanid", "x-b3-sampled"}:
self.assertIn(required_header, headers)
self.assertEqual(
headers["x-b3-traceid"], b3_format.format_trace_id(trace_id)
)
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ def extract(cls, get_from_carrier, carrier):
# header is set to allow.
if sampled in cls._SAMPLE_PROPAGATE_VALUES or flags == "1":
options |= trace.TraceOptions.RECORDED

return trace.SpanContext(
# trace an span ids are encoded in hex, so must be converted
trace_id=int(trace_id, 16),
Expand Down