Skip to content

Commit

Permalink
Custom sampler fix (#3026)
Browse files Browse the repository at this point in the history
* Fixed circular dependency that can happen when injecting custom samplers

* lint

* Deleted duplicate tests

* lint

* lint

* lint

* lint

* lint

* Update opentelemetry-sdk/src/opentelemetry/sdk/_configuration/__init__.py

Co-authored-by: Leighton Chen <lechen@microsoft.com>

* Update opentelemetry-sdk/src/opentelemetry/sdk/_configuration/__init__.py

Co-authored-by: Leighton Chen <lechen@microsoft.com>

* typing

* Update opentelemetry-sdk/src/opentelemetry/sdk/_configuration/__init__.py

Co-authored-by: Leighton Chen <lechen@microsoft.com>

* Update opentelemetry-sdk/src/opentelemetry/sdk/_configuration/__init__.py

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>

* Retry tests

* Fixed circular dependency that can happen when injecting custom samplers

* lint

* Deleted duplicate tests

* lint

* lint

* lint

* lint

* lint

* Update opentelemetry-sdk/src/opentelemetry/sdk/_configuration/__init__.py

Co-authored-by: Leighton Chen <lechen@microsoft.com>

* Update opentelemetry-sdk/src/opentelemetry/sdk/_configuration/__init__.py

Co-authored-by: Leighton Chen <lechen@microsoft.com>

* typing

* Retry tests

* Update opentelemetry-sdk/src/opentelemetry/sdk/_configuration/__init__.py

Co-authored-by: Leighton Chen <lechen@microsoft.com>

* Update opentelemetry-sdk/src/opentelemetry/sdk/_configuration/__init__.py

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>

* Updated contrib sha

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
Co-authored-by: Leighton Chen <lechen@microsoft.com>
Co-authored-by: Diego Hurtado <ocelotl@users.noreply.github.com>
  • Loading branch information
4 people authored Nov 21, 2022
1 parent 74ced85 commit b02ff47
Show file tree
Hide file tree
Showing 8 changed files with 412 additions and 329 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ env:
# Otherwise, set variable to the commit of your branch on
# opentelemetry-python-contrib which is compatible with these Core repo
# changes.
CONTRIB_REPO_SHA: 66edf69811e142c397d8500cafe6eddeb5565d6e
CONTRIB_REPO_SHA: c6134843900e2eeb1b8b3383a897b38cc0905c38
# This is needed because we do not clone the core repo in contrib builds anymore.
# When running contrib builds as part of core builds, we use actions/checkout@v2 which
# does not set an environment variable (simply just runs tox), which is different when
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Fixed circular dependency issue with custom samplers
([#3026](https://github.com/open-telemetry/opentelemetry-python/pull/3026))
- Add missing entry points for OTLP/HTTP exporter
([#3027](https://github.com/open-telemetry/opentelemetry-python/pull/3027))

Expand Down
78 changes: 72 additions & 6 deletions opentelemetry-sdk/src/opentelemetry/sdk/_configuration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
import os
from abc import ABC, abstractmethod
from os import environ
from typing import Dict, Optional, Sequence, Tuple, Type
from typing import Callable, Dict, List, Optional, Sequence, Tuple, Type

from pkg_resources import iter_entry_points
from typing_extensions import Literal

from opentelemetry.environment_variables import (
Expand All @@ -44,6 +45,8 @@
OTEL_EXPORTER_OTLP_METRICS_PROTOCOL,
OTEL_EXPORTER_OTLP_PROTOCOL,
OTEL_EXPORTER_OTLP_TRACES_PROTOCOL,
OTEL_TRACES_SAMPLER,
OTEL_TRACES_SAMPLER_ARG,
)
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import (
Expand All @@ -54,7 +57,7 @@
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter
from opentelemetry.sdk.trace.id_generator import IdGenerator
from opentelemetry.sdk.util import _import_config_components
from opentelemetry.sdk.trace.sampling import Sampler
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.trace import set_tracer_provider

Expand Down Expand Up @@ -82,9 +85,35 @@
_RANDOM_ID_GENERATOR = "random"
_DEFAULT_ID_GENERATOR = _RANDOM_ID_GENERATOR

_OTEL_SAMPLER_ENTRY_POINT_GROUP = "opentelemetry_traces_sampler"

_logger = logging.getLogger(__name__)


def _import_config_components(
selected_components: List[str], entry_point_name: str
) -> Sequence[Tuple[str, object]]:
component_entry_points = {
ep.name: ep for ep in iter_entry_points(entry_point_name)
}
component_impls = []
for selected_component in selected_components:
entry_point = component_entry_points.get(selected_component, None)
if not entry_point:
raise RuntimeError(
f"Requested component '{selected_component}' not found in entry points for '{entry_point_name}'"
)

component_impl = entry_point.load()
component_impls.append((selected_component, component_impl))

return component_impls


def _get_sampler() -> Optional[str]:
return environ.get(OTEL_TRACES_SAMPLER, None)


def _get_id_generator() -> str:
return environ.get(OTEL_PYTHON_ID_GENERATOR, _DEFAULT_ID_GENERATOR)

Expand Down Expand Up @@ -149,7 +178,8 @@ def _get_exporter_names(

def _init_tracing(
exporters: Dict[str, Type[SpanExporter]],
id_generator: IdGenerator,
id_generator: IdGenerator = None,
sampler: Sampler = None,
auto_instrumentation_version: Optional[str] = None,
):
# if env var OTEL_RESOURCE_ATTRIBUTES is given, it will read the service_name
Expand All @@ -161,7 +191,8 @@ def _init_tracing(
ResourceAttributes.TELEMETRY_AUTO_VERSION
] = auto_instrumentation_version
provider = TracerProvider(
id_generator=id_generator(),
id_generator=id_generator,
sampler=sampler,
resource=Resource.create(auto_resource),
)
set_tracer_provider(provider)
Expand Down Expand Up @@ -266,13 +297,41 @@ def _import_exporters(
return trace_exporters, metric_exporters, log_exporters


def _import_sampler_factory(sampler_name: str) -> Callable[[str], Sampler]:
_, sampler_impl = _import_config_components(
[sampler_name.strip()], _OTEL_SAMPLER_ENTRY_POINT_GROUP
)[0]
return sampler_impl


def _import_sampler(sampler_name: str) -> Optional[Sampler]:
if not sampler_name:
return None
try:
sampler_factory = _import_sampler_factory(sampler_name)
sampler_arg = os.getenv(OTEL_TRACES_SAMPLER_ARG, "")
sampler = sampler_factory(sampler_arg)
if not isinstance(sampler, Sampler):
message = f"Sampler factory, {sampler_factory}, produced output, {sampler}, which is not a Sampler."
_logger.warning(message)
raise ValueError(message)
return sampler
except Exception as exc: # pylint: disable=broad-except
_logger.warning(
"Using default sampler. Failed to initialize custom sampler, %s: %s",
sampler_name,
exc,
)
return None


def _import_id_generator(id_generator_name: str) -> IdGenerator:
id_generator_name, id_generator_impl = _import_config_components(
[id_generator_name.strip()], "opentelemetry_id_generator"
)[0]

if issubclass(id_generator_impl, IdGenerator):
return id_generator_impl
return id_generator_impl()

raise RuntimeError(f"{id_generator_name} is not an IdGenerator")

Expand All @@ -283,9 +342,16 @@ def _initialize_components(auto_instrumentation_version):
_get_exporter_names("metrics"),
_get_exporter_names("logs"),
)
sampler_name = _get_sampler()
sampler = _import_sampler(sampler_name)
id_generator_name = _get_id_generator()
id_generator = _import_id_generator(id_generator_name)
_init_tracing(trace_exporters, id_generator, auto_instrumentation_version)
_init_tracing(
exporters=trace_exporters,
id_generator=id_generator,
sampler=sampler,
auto_instrumentation_version=auto_instrumentation_version,
)
_init_metrics(metric_exporters, auto_instrumentation_version)
logging_enabled = os.getenv(
_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED, "false"
Expand Down
37 changes: 25 additions & 12 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@

_ENV_VALUE_UNSET = ""

# pylint: disable=protected-access
_TRACE_SAMPLER = sampling._get_from_env_or_default()


class SpanProcessor:
"""Interface which allows hooks for SDK's `Span` start and end method
Expand Down Expand Up @@ -334,7 +331,7 @@ def _check_span_ended(func):
def wrapper(self, *args, **kwargs):
already_ended = False
with self._lock: # pylint: disable=protected-access
if self._end_time is None:
if self._end_time is None: # pylint: disable=protected-access
func(self, *args, **kwargs)
else:
already_ended = True
Expand Down Expand Up @@ -519,7 +516,11 @@ def _format_events(events):
f_event = OrderedDict()
f_event["name"] = event.name
f_event["timestamp"] = util.ns_to_iso_str(event.timestamp)
f_event["attributes"] = Span._format_attributes(event.attributes)
f_event[
"attributes"
] = Span._format_attributes( # pylint: disable=protected-access
event.attributes
)
f_events.append(f_event)
return f_events

Expand All @@ -528,8 +529,16 @@ def _format_links(links):
f_links = []
for link in links:
f_link = OrderedDict()
f_link["context"] = Span._format_context(link.context)
f_link["attributes"] = Span._format_attributes(link.attributes)
f_link[
"context"
] = Span._format_context( # pylint: disable=protected-access
link.context
)
f_link[
"attributes"
] = Span._format_attributes( # pylint: disable=protected-access
link.attributes
)
f_links.append(f_link)
return f_links

Expand Down Expand Up @@ -691,10 +700,12 @@ def _from_env_if_absent(
)

# not removed for backward compat. please use SpanLimits instead.
SPAN_ATTRIBUTE_COUNT_LIMIT = SpanLimits._from_env_if_absent(
None,
OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT,
_DEFAULT_OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT,
SPAN_ATTRIBUTE_COUNT_LIMIT = (
SpanLimits._from_env_if_absent( # pylint: disable=protected-access
None,
OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT,
_DEFAULT_OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT,
)
)


Expand Down Expand Up @@ -1115,7 +1126,7 @@ class TracerProvider(trace_api.TracerProvider):

def __init__(
self,
sampler: sampling.Sampler = _TRACE_SAMPLER,
sampler: sampling.Sampler = None,
resource: Resource = Resource.create({}),
shutdown_on_exit: bool = True,
active_span_processor: Union[
Expand All @@ -1132,6 +1143,8 @@ def __init__(
else:
self.id_generator = id_generator
self._resource = resource
if not sampler:
sampler = sampling._get_from_env_or_default()
self.sampler = sampler
self._span_limits = span_limits or SpanLimits()
self._atexit_handler = None
Expand Down
64 changes: 20 additions & 44 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@
* parentbased_always_off - Sampler that respects its parent span's sampling decision, but otherwise never samples.
* parentbased_traceidratio - Sampler that respects its parent span's sampling decision, but otherwise samples probabalistically based on rate.
Sampling probability can be set with ``OTEL_TRACES_SAMPLER_ARG`` if the sampler is traceidratio or parentbased_traceidratio. Rate must be in the range [0.0,1.0]. When not provided rate will be set to 1.0 (maximum rate possible).
Sampling probability can be set with ``OTEL_TRACES_SAMPLER_ARG`` if the sampler is traceidratio or parentbased_traceidratio. Rate must be in the range [0.0,1.0]. When not provided rate will be set to
1.0 (maximum rate possible).
Prev example but with environment variables. Please make sure to set the env ``OTEL_TRACES_SAMPLER=traceidratio`` and ``OTEL_TRACES_SAMPLER_ARG=0.001``.
Expand All @@ -97,9 +98,10 @@
with trace.get_tracer(__name__).start_as_current_span("Test Span"):
...
In order to create a configurable custom sampler, create an entry point for the custom sampler factory method under the entry point group, ``opentelemetry_traces_sampler``. The custom sampler factory
method must be of type ``Callable[[str], Sampler]``, taking a single string argument and returning a Sampler object. The single input will come from the string value of the
``OTEL_TRACES_SAMPLER_ARG`` environment variable. If ``OTEL_TRACES_SAMPLER_ARG`` is not configured, the input will be an empty string. For example:
When utilizing a configurator, you can configure a custom sampler. In order to create a configurable custom sampler, create an entry point for the custom sampler
factory method or function under the entry point group, ``opentelemetry_traces_sampler``. The custom sampler factory method must be of type ``Callable[[str], Sampler]``, taking a single string argument and
returning a Sampler object. The single input will come from the string value of the ``OTEL_TRACES_SAMPLER_ARG`` environment variable. If ``OTEL_TRACES_SAMPLER_ARG`` is not configured, the input will
be an empty string. For example:
.. code:: python
Expand Down Expand Up @@ -134,15 +136,14 @@ class CustomSamplerFactory:
import os
from logging import getLogger
from types import MappingProxyType
from typing import Callable, Optional, Sequence
from typing import Optional, Sequence

# pylint: disable=unused-import
from opentelemetry.context import Context
from opentelemetry.sdk.environment_variables import (
OTEL_TRACES_SAMPLER,
OTEL_TRACES_SAMPLER_ARG,
)
from opentelemetry.sdk.util import _import_config_components
from opentelemetry.trace import Link, SpanKind, get_current_span
from opentelemetry.trace.span import TraceState
from opentelemetry.util.types import Attributes
Expand Down Expand Up @@ -193,9 +194,6 @@ def __init__(
self.trace_state = trace_state


_OTEL_SAMPLER_ENTRY_POINT_GROUP = "opentelemetry_traces_sampler"


class Sampler(abc.ABC):
@abc.abstractmethod
def should_sample(
Expand Down Expand Up @@ -407,48 +405,26 @@ def __init__(self, rate: float):


def _get_from_env_or_default() -> Sampler:
traces_sampler_name = os.getenv(
trace_sampler = os.getenv(
OTEL_TRACES_SAMPLER, "parentbased_always_on"
).lower()
if trace_sampler not in _KNOWN_SAMPLERS:
_logger.warning("Couldn't recognize sampler %s.", trace_sampler)
trace_sampler = "parentbased_always_on"

if traces_sampler_name in _KNOWN_SAMPLERS:
if traces_sampler_name in ("traceidratio", "parentbased_traceidratio"):
try:
rate = float(os.getenv(OTEL_TRACES_SAMPLER_ARG))
except ValueError:
_logger.warning(
"Could not convert TRACES_SAMPLER_ARG to float."
)
rate = 1.0
return _KNOWN_SAMPLERS[traces_sampler_name](rate)
return _KNOWN_SAMPLERS[traces_sampler_name]
try:
traces_sampler_factory = _import_sampler_factory(traces_sampler_name)
sampler_arg = os.getenv(OTEL_TRACES_SAMPLER_ARG, "")
traces_sampler = traces_sampler_factory(sampler_arg)
if not isinstance(traces_sampler, Sampler):
message = f"Traces sampler factory, {traces_sampler_factory}, produced output, {traces_sampler}, which is not a Sampler object."
_logger.warning(message)
raise ValueError(message)
return traces_sampler
except Exception as exc: # pylint: disable=broad-except
_logger.warning(
"Using default sampler. Failed to initialize custom sampler, %s: %s",
traces_sampler_name,
exc,
)
return _KNOWN_SAMPLERS["parentbased_always_on"]
if trace_sampler in ("traceidratio", "parentbased_traceidratio"):
try:
rate = float(os.getenv(OTEL_TRACES_SAMPLER_ARG))
except ValueError:
_logger.warning("Could not convert TRACES_SAMPLER_ARG to float.")
rate = 1.0
return _KNOWN_SAMPLERS[trace_sampler](rate)

return _KNOWN_SAMPLERS[trace_sampler]


def _get_parent_trace_state(parent_context) -> Optional["TraceState"]:
parent_span_context = get_current_span(parent_context).get_span_context()
if parent_span_context is None or not parent_span_context.is_valid:
return None
return parent_span_context.trace_state


def _import_sampler_factory(sampler_name: str) -> Callable[[str], Sampler]:
_, sampler_impl = _import_config_components(
[sampler_name.strip()], _OTEL_SAMPLER_ENTRY_POINT_GROUP
)[0]
return sampler_impl
Loading

0 comments on commit b02ff47

Please sign in to comment.