Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make SpanProcessor.on_start accept parent Context #1251

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions exporter/opentelemetry-exporter-datadog/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Make `SpanProcessor.on_start` accept parent Context
([#1251](https://github.com/open-telemetry/opentelemetry-python/pull/1251))

## Version 0.14b0

Released 2020-10-13
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import threading
import typing

from opentelemetry.context import attach, detach, set_value
from opentelemetry.context import Context, attach, detach, set_value
from opentelemetry.sdk.trace import Span, SpanProcessor
from opentelemetry.sdk.trace.export import SpanExporter
from opentelemetry.trace import INVALID_TRACE_ID
Expand Down Expand Up @@ -81,7 +81,9 @@ def __init__(
self.done = False
self.worker_thread.start()

def on_start(self, span: Span) -> None:
def on_start(
self, span: Span, parent_context: typing.Optional[Context] = None
) -> None:
ctx = span.get_span_context()
trace_id = ctx.trace_id

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ddtrace.internal.writer import AgentWriter

from opentelemetry import trace as trace_api
from opentelemetry.context import Context
from opentelemetry.exporter import datadog
from opentelemetry.sdk import trace
from opentelemetry.sdk.trace import Resource, sampling
Expand Down Expand Up @@ -482,6 +483,21 @@ def test_span_processor_scheduled_delay(self):

tracer_provider.shutdown()

def test_span_processor_accepts_parent_context(self):
span_processor = mock.Mock(
wraps=datadog.DatadogExportSpanProcessor(self.exporter)
)
tracer_provider = trace.TracerProvider()
tracer_provider.add_span_processor(span_processor)
tracer = tracer_provider.get_tracer(__name__)

context = Context()
span = tracer.start_span("foo", context=context)

span_processor.on_start.assert_called_once_with(
span, parent_context=context
)

def test_origin(self):
context = trace_api.SpanContext(
trace_id=0x000000000000000000000000DEADBEEF,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

class HelloWorldResource:
def _handle_request(self, _, resp):
# pylint: disable=no-member
resp.status = falcon.HTTP_201
resp.body = "Hello World"

Expand Down
3 changes: 3 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Make `SpanProcessor.on_start` accept parent Context
([#1251](https://github.com/open-telemetry/opentelemetry-python/pull/1251))

## Version 0.14b0

Released 2020-10-13
Expand Down
48 changes: 35 additions & 13 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,19 @@ class SpanProcessor:
in the same order as they were registered.
"""

def on_start(self, span: "Span") -> None:
def on_start(
self,
span: "Span",
parent_context: Optional[context_api.Context] = None,
) -> None:
"""Called when a :class:`opentelemetry.trace.Span` is started.

This method is called synchronously on the thread that starts the
span, therefore it should not block or throw an exception.

Args:
span: The :class:`opentelemetry.trace.Span` that just started.
parent_context: The parent context of the span that just started.
"""

def on_end(self, span: "Span") -> None:
Expand Down Expand Up @@ -124,9 +129,13 @@ def add_span_processor(self, span_processor: SpanProcessor) -> None:
with self._lock:
self._span_processors = self._span_processors + (span_processor,)

def on_start(self, span: "Span") -> None:
def on_start(
self,
span: "Span",
parent_context: Optional[context_api.Context] = None,
) -> None:
for sp in self._span_processors:
sp.on_start(span)
sp.on_start(span, parent_context=parent_context)

def on_end(self, span: "Span") -> None:
for sp in self._span_processors:
Expand Down Expand Up @@ -192,17 +201,26 @@ def add_span_processor(self, span_processor: SpanProcessor) -> None:
self._span_processors = self._span_processors + (span_processor,)

def _submit_and_await(
self, func: Callable[[SpanProcessor], Callable[..., None]], *args: Any
self,
func: Callable[[SpanProcessor], Callable[..., None]],
*args: Any,
**kwargs: Any
):
futures = []
for sp in self._span_processors:
future = self._executor.submit(func(sp), *args)
future = self._executor.submit(func(sp), *args, **kwargs)
futures.append(future)
for future in futures:
future.result()

def on_start(self, span: "Span") -> None:
self._submit_and_await(lambda sp: sp.on_start, span)
def on_start(
self,
span: "Span",
parent_context: Optional[context_api.Context] = None,
) -> None:
self._submit_and_await(
lambda sp: sp.on_start, span, parent_context=parent_context
)

def on_end(self, span: "Span") -> None:
self._submit_and_await(lambda sp: sp.on_end, span)
Expand Down Expand Up @@ -584,7 +602,11 @@ def add_event(
)
)

def start(self, start_time: Optional[int] = None) -> None:
def start(
self,
start_time: Optional[int] = None,
parent_context: Optional[context_api.Context] = None,
) -> None:
with self._lock:
if not self.is_recording():
return
Expand All @@ -596,7 +618,7 @@ def start(self, start_time: Optional[int] = None) -> None:
if has_started:
logger.warning("Calling start() on a started span.")
return
self.span_processor.on_start(self)
self.span_processor.on_start(self, parent_context=parent_context)

def end(self, end_time: Optional[int] = None) -> None:
with self._lock:
Expand Down Expand Up @@ -764,7 +786,7 @@ def start_span( # pylint: disable=too-many-locals
if sampling_result.decision.is_sampled()
else trace_api.TraceFlags(trace_api.TraceFlags.DEFAULT)
)
context = trace_api.SpanContext(
span_context = trace_api.SpanContext(
trace_id,
self.source.ids_generator.generate_span_id(),
is_remote=False,
Expand All @@ -777,7 +799,7 @@ def start_span( # pylint: disable=too-many-locals
# pylint:disable=protected-access
span = _Span(
name=name,
context=context,
context=span_context,
parent=parent_span_context,
sampler=self.source.sampler,
resource=self.source.resource,
Expand All @@ -788,9 +810,9 @@ def start_span( # pylint: disable=too-many-locals
instrumentation_info=self.instrumentation_info,
set_status_on_exception=set_status_on_exception,
)
span.start(start_time=start_time)
span.start(start_time=start_time, parent_context=context)
else:
span = trace_api.DefaultSpan(context=context)
span = trace_api.DefaultSpan(context=span_context)
return span

@contextmanager
Expand Down
10 changes: 7 additions & 3 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from enum import Enum

from opentelemetry.configuration import Configuration
from opentelemetry.context import attach, detach, set_value
from opentelemetry.context import Context, attach, detach, set_value
from opentelemetry.sdk.trace import Span, SpanProcessor
from opentelemetry.util import time_ns

Expand Down Expand Up @@ -70,7 +70,9 @@ class SimpleExportSpanProcessor(SpanProcessor):
def __init__(self, span_exporter: SpanExporter):
self.span_exporter = span_exporter

def on_start(self, span: Span) -> None:
def on_start(
self, span: Span, parent_context: typing.Optional[Context] = None
) -> None:
pass

def on_end(self, span: Span) -> None:
Expand Down Expand Up @@ -172,7 +174,9 @@ def __init__(
] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]]
self.worker_thread.start()

def on_start(self, span: Span) -> None:
def on_start(
self, span: Span, parent_context: typing.Optional[Context] = None
) -> None:
pass

def on_end(self, span: Span) -> None:
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-sdk/tests/error_handler/test_error_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@


class TestErrorHandler(TestCase):
def test_default_error_handler(self):
@patch("opentelemetry.sdk.error_handler.iter_entry_points")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this change for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when running the test locally with the eachdist script this test fails. The problem is that there are some entry points already setup when installing a dev environment with eachdist develop. This test however checkes the default error handler when no error handler entry points are configured.

def test_default_error_handler(self, mock_iter_entry_points):

with self.assertLogs(logger, ERROR):
with GlobalErrorHandler():
Expand Down
41 changes: 41 additions & 0 deletions opentelemetry-sdk/tests/trace/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from unittest import mock

from opentelemetry import trace as trace_api
from opentelemetry.configuration import Configuration
from opentelemetry.context import Context
from opentelemetry.sdk import trace
from opentelemetry.sdk.trace import export

Expand Down Expand Up @@ -100,6 +102,23 @@ def test_simple_span_processor_no_context(self):

self.assertListEqual(["xxx", "bar", "foo"], spans_names_list)

def test_on_start_accepts_context(self):
# pylint: disable=no-self-use
tracer_provider = trace.TracerProvider()
tracer = tracer_provider.get_tracer(__name__)

exporter = MySpanExporter([])
span_processor = mock.Mock(
wraps=export.SimpleExportSpanProcessor(exporter)
)
tracer_provider.add_span_processor(span_processor)

context = Context()
span = tracer.start_span("foo", context=context)
span_processor.on_start.assert_called_once_with(
span, parent_context=context
)

def test_simple_span_processor_not_sampled(self):
tracer_provider = trace.TracerProvider(
sampler=trace.sampling.ALWAYS_OFF
Expand Down Expand Up @@ -136,6 +155,11 @@ def _create_start_and_end_span(name, span_processor):


class TestBatchExportSpanProcessor(unittest.TestCase):
def tearDown(self) -> None:
# reset global state of configuration object
# pylint: disable=protected-access
Configuration._reset()

@mock.patch.dict(
"os.environ",
{
Expand All @@ -156,6 +180,23 @@ def test_batch_span_processor_environment_variables(self):
self.assertEqual(batch_span_processor.max_export_batch_size, 3)
self.assertEqual(batch_span_processor.export_timeout_millis, 4)

def test_on_start_accepts_parent_context(self):
# pylint: disable=no-self-use
my_exporter = MySpanExporter(destination=[])
span_processor = mock.Mock(
wraps=export.BatchExportSpanProcessor(my_exporter)
)
tracer_provider = trace.TracerProvider()
tracer_provider.add_span_processor(span_processor)
tracer = tracer_provider.get_tracer(__name__)

context = Context()
span = tracer.start_span("foo", context=context)

span_processor.on_start.assert_called_once_with(
span, parent_context=context
)

def test_shutdown(self):
spans_names_list = []

Expand Down
13 changes: 10 additions & 3 deletions opentelemetry-sdk/tests/trace/test_span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import typing
import unittest
from threading import Event
from typing import Optional
from unittest import mock

from opentelemetry import trace as trace_api
from opentelemetry.context import Context
from opentelemetry.sdk import trace


Expand All @@ -36,7 +38,9 @@ def __init__(self, name, span_list):
self.name = name
self.span_list = span_list

def on_start(self, span: "trace.Span") -> None:
def on_start(
self, span: "trace.Span", parent_context: Optional[Context] = None
) -> None:
self.span_list.append(span_event_start_fmt(self.name, span.name))

def on_end(self, span: "trace.Span") -> None:
Expand Down Expand Up @@ -160,10 +164,13 @@ def test_on_start(self):
multi_processor.add_span_processor(mock_processor)

span = self.create_default_span()
multi_processor.on_start(span)
context = Context()
multi_processor.on_start(span, parent_context=context)

for mock_processor in mocks:
mock_processor.on_start.assert_called_once_with(span)
mock_processor.on_start.assert_called_once_with(
span, parent_context=context
)
multi_processor.shutdown()

def test_on_end(self):
Expand Down
22 changes: 21 additions & 1 deletion opentelemetry-sdk/tests/trace/test_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import subprocess
import unittest
from logging import ERROR, WARNING
from typing import Optional
from unittest import mock

from opentelemetry import trace as trace_api
from opentelemetry.context import Context
from opentelemetry.sdk import resources, trace
from opentelemetry.sdk.trace import Resource, sampling
from opentelemetry.sdk.util import ns_to_iso_str
Expand Down Expand Up @@ -435,6 +437,8 @@ def test_disallow_direct_span_creation(self):


class TestSpan(unittest.TestCase):
# pylint: disable=too-many-public-methods

def setUp(self):
self.tracer = new_tracer()

Expand Down Expand Up @@ -734,6 +738,20 @@ def test_start_span(self):
)
self.assertIs(span.status.description, "Test description")

def test_start_accepts_context(self):
# pylint: disable=no-self-use
span_processor = mock.Mock(spec=trace.SpanProcessor)
span = trace._Span(
"name",
mock.Mock(spec=trace_api.SpanContext),
span_processor=span_processor,
)
context = Context()
span.start(parent_context=context)
span_processor.on_start.assert_called_once_with(
span, parent_context=context
)

def test_span_override_start_and_end_time(self):
"""Span sending custom start_time and end_time values"""
span = trace._Span("name", mock.Mock(spec=trace_api.SpanContext))
Expand Down Expand Up @@ -899,7 +917,9 @@ def __init__(self, name, span_list):
self.name = name
self.span_list = span_list

def on_start(self, span: "trace.Span") -> None:
def on_start(
self, span: "trace.Span", parent_context: Optional[Context] = None
) -> None:
self.span_list.append(span_event_start_fmt(self.name, span.name))

def on_end(self, span: "trace.Span") -> None:
Expand Down