Skip to content

Commit

Permalink
Merge branch 'main' into test-on-windows
Browse files Browse the repository at this point in the history
  • Loading branch information
alrex authored Jun 10, 2021
2 parents af5682d + a47a202 commit 64ae591
Show file tree
Hide file tree
Showing 12 changed files with 179 additions and 90 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.3.0-0.22b0...HEAD)

### Changed
- Updated `opentelemetry-opencensus-exporter` to use `service_name` of spans instead of resource
([#1897](https://github.com/open-telemetry/opentelemetry-python/pull/1897))

## [1.3.0-0.22b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.3.0-0.22b0) - 2021-06-01

### Added
Expand All @@ -24,6 +28,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Update protos to latest version release 0.9.0
([#1873](https://github.com/open-telemetry/opentelemetry-python/pull/1873))

### Fixed
- Updated `opentelementry-opentracing-shim` `ScopeShim` to report exceptions in
opentelemetry specification format, rather than opentracing spec format.

## [1.2.0, 0.21b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.2.0-0.21b0) - 2021-05-11

### Added
Expand All @@ -35,6 +43,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1829](https://github.com/open-telemetry/opentelemetry-python/pull/1829))
- Lazily read/configure limits and allow limits to be unset.
([#1839](https://github.com/open-telemetry/opentelemetry-python/pull/1839))
- Added support for OTEL_EXPORTER_JAEGER_TIMEOUT
([#1863](https://github.com/open-telemetry/opentelemetry-python/pull/1863))

### Changed
- Fixed OTLP gRPC exporter silently failing if scheme is not specified in endpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
- :envvar:`OTEL_EXPORTER_JAEGER_ENDPOINT`
- :envvar:`OTEL_EXPORTER_JAEGER_CERTIFICATE`
- :envvar:`OTEL_EXPORTER_JAEGER_TIMEOUT`
API
---
Expand All @@ -68,7 +69,7 @@
from os import environ
from typing import Optional

from grpc import ChannelCredentials, insecure_channel, secure_channel
from grpc import ChannelCredentials, RpcError, insecure_channel, secure_channel

from opentelemetry import trace
from opentelemetry.exporter.jaeger.proto.grpc import util
Expand All @@ -85,11 +86,13 @@
)
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_JAEGER_ENDPOINT,
OTEL_EXPORTER_JAEGER_TIMEOUT,
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult

DEFAULT_GRPC_COLLECTOR_ENDPOINT = "localhost:14250"
DEFAULT_EXPORT_TIMEOUT = 10

logger = logging.getLogger(__name__)

Expand All @@ -103,6 +106,7 @@ class JaegerExporter(SpanExporter):
insecure: True if collector has no encryption or authentication
credentials: Credentials for server authentication.
max_tag_value_length: Max length string attribute values can have. Set to None to disable.
timeout: Maximum time the Jaeger exporter should wait for each batch export.
"""

def __init__(
Expand All @@ -111,13 +115,15 @@ def __init__(
insecure: Optional[bool] = None,
credentials: Optional[ChannelCredentials] = None,
max_tag_value_length: Optional[int] = None,
timeout: Optional[int] = None,
):
self._max_tag_value_length = max_tag_value_length

self.collector_endpoint = _parameter_setter(
param=collector_endpoint,
env_variable=environ.get(OTEL_EXPORTER_JAEGER_ENDPOINT),
default=None,
self.collector_endpoint = collector_endpoint or environ.get(
OTEL_EXPORTER_JAEGER_ENDPOINT, DEFAULT_GRPC_COLLECTOR_ENDPOINT
)
self._timeout = timeout or int(
environ.get(OTEL_EXPORTER_JAEGER_TIMEOUT, DEFAULT_EXPORT_TIMEOUT)
)
self._grpc_client = None
self.insecure = insecure
Expand All @@ -131,16 +137,15 @@ def __init__(

@property
def _collector_grpc_client(self) -> Optional[CollectorServiceStub]:
endpoint = self.collector_endpoint or DEFAULT_GRPC_COLLECTOR_ENDPOINT

if self._grpc_client is None:
if self.insecure:
self._grpc_client = CollectorServiceStub(
insecure_channel(endpoint)
insecure_channel(self.collector_endpoint)
)
else:
self._grpc_client = CollectorServiceStub(
secure_channel(endpoint, self.credentials)
secure_channel(self.collector_endpoint, self.credentials)
)
return self._grpc_client

Expand All @@ -161,25 +166,16 @@ def export(self, spans) -> SpanExportResult:
jaeger_spans = translator._translate(pb_translator)
batch = model_pb2.Batch(spans=jaeger_spans)
request = PostSpansRequest(batch=batch)
self._collector_grpc_client.PostSpans(request)

return SpanExportResult.SUCCESS
try:
self._collector_grpc_client.PostSpans(
request, timeout=self._timeout
)
return SpanExportResult.SUCCESS
except RpcError as error:
logger.warning(
"Failed to export batch. Status code: %s", error.code()
)
return SpanExportResult.FAILURE

def shutdown(self):
pass


def _parameter_setter(param, env_variable, default):
"""Returns value according to the provided data.
Args:
param: Constructor parameter value
env_variable: Environment variable related to the parameter
default: Constructor parameter default value
"""
if param is None:
res = env_variable or default
else:
res = param

return res
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_JAEGER_CERTIFICATE,
OTEL_EXPORTER_JAEGER_ENDPOINT,
OTEL_EXPORTER_JAEGER_TIMEOUT,
OTEL_RESOURCE_ATTRIBUTES,
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
Expand Down Expand Up @@ -71,6 +72,7 @@ def test_constructor_by_environment_variables(self):
OTEL_EXPORTER_JAEGER_CERTIFICATE: os.path.dirname(__file__)
+ "/certs/cred.cert",
OTEL_RESOURCE_ATTRIBUTES: "service.name=my-opentelemetry-jaeger",
OTEL_EXPORTER_JAEGER_TIMEOUT: "5",
},
)

Expand All @@ -81,6 +83,7 @@ def test_constructor_by_environment_variables(self):
self.assertEqual(exporter.service_name, service)
self.assertIsNotNone(exporter._collector_grpc_client)
self.assertEqual(exporter.collector_endpoint, collector_endpoint)
self.assertEqual(exporter._timeout, 5)
self.assertIsNotNone(exporter.credentials)
env_patch.stop()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
- :envvar:`OTEL_EXPORTER_JAEGER_AGENT_PORT`
- :envvar:`OTEL_EXPORTER_JAEGER_AGENT_HOST`
- :envvar:`OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES`
- :envvar:`OTEL_EXPORTER_JAEGER_TIMEOUT`
API
---
Expand Down Expand Up @@ -94,13 +95,15 @@
OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES,
OTEL_EXPORTER_JAEGER_ENDPOINT,
OTEL_EXPORTER_JAEGER_PASSWORD,
OTEL_EXPORTER_JAEGER_TIMEOUT,
OTEL_EXPORTER_JAEGER_USER,
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult

DEFAULT_AGENT_HOST_NAME = "localhost"
DEFAULT_AGENT_PORT = 6831
DEFAULT_EXPORT_TIMEOUT = 10

logger = logging.getLogger(__name__)

Expand All @@ -119,6 +122,7 @@ class JaegerExporter(SpanExporter):
required.
max_tag_value_length: Max length string attribute values can have. Set to None to disable.
udp_split_oversized_batches: Re-emit oversized batches in smaller chunks.
timeout: Maximum time the Jaeger exporter should wait for each batch export.
"""

def __init__(
Expand All @@ -130,51 +134,34 @@ def __init__(
password: Optional[str] = None,
max_tag_value_length: Optional[int] = None,
udp_split_oversized_batches: bool = None,
timeout: Optional[int] = None,
):
self._max_tag_value_length = max_tag_value_length
self.agent_host_name = _parameter_setter(
param=agent_host_name,
env_variable=environ.get(OTEL_EXPORTER_JAEGER_AGENT_HOST),
default=DEFAULT_AGENT_HOST_NAME,
self.agent_host_name = agent_host_name or environ.get(
OTEL_EXPORTER_JAEGER_AGENT_HOST, DEFAULT_AGENT_HOST_NAME
)

environ_agent_port = environ.get(OTEL_EXPORTER_JAEGER_AGENT_PORT)
environ_agent_port = (
int(environ_agent_port) if environ_agent_port is not None else None
self.agent_port = agent_port or int(
environ.get(OTEL_EXPORTER_JAEGER_AGENT_PORT, DEFAULT_AGENT_PORT)
)

self.agent_port = _parameter_setter(
param=agent_port,
env_variable=environ_agent_port,
default=DEFAULT_AGENT_PORT,
self._timeout = timeout or int(
environ.get(OTEL_EXPORTER_JAEGER_TIMEOUT, DEFAULT_EXPORT_TIMEOUT)
)
self.udp_split_oversized_batches = _parameter_setter(
param=udp_split_oversized_batches,
env_variable=environ.get(
OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES
),
default=False,

self.udp_split_oversized_batches = udp_split_oversized_batches or bool(
environ.get(OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES)
)
self._agent_client = AgentClientUDP(
host_name=self.agent_host_name,
port=self.agent_port,
split_oversized_batches=self.udp_split_oversized_batches,
)
self.collector_endpoint = _parameter_setter(
param=collector_endpoint,
env_variable=environ.get(OTEL_EXPORTER_JAEGER_ENDPOINT),
default=None,
)
self.username = _parameter_setter(
param=username,
env_variable=environ.get(OTEL_EXPORTER_JAEGER_USER),
default=None,
)
self.password = _parameter_setter(
param=password,
env_variable=environ.get(OTEL_EXPORTER_JAEGER_PASSWORD),
default=None,
self.collector_endpoint = collector_endpoint or environ.get(
OTEL_EXPORTER_JAEGER_ENDPOINT
)
self.username = username or environ.get(OTEL_EXPORTER_JAEGER_USER)
self.password = password or environ.get(OTEL_EXPORTER_JAEGER_PASSWORD)
self._collector = None
tracer_provider = trace.get_tracer_provider()
self.service_name = (
Expand All @@ -195,8 +182,12 @@ def _collector_http_client(self) -> Optional[Collector]:
if self.username is not None and self.password is not None:
auth = (self.username, self.password)

# Thrift HTTP Client expects timeout in millis
timeout_in_millis = self._timeout * 1000.0
self._collector = Collector(
thrift_url=self.collector_endpoint, auth=auth
thrift_url=self.collector_endpoint,
auth=auth,
timeout_in_millis=timeout_in_millis,
)
return self._collector

Expand Down Expand Up @@ -226,19 +217,3 @@ def export(self, spans) -> SpanExportResult:

def shutdown(self):
pass


def _parameter_setter(param, env_variable, default):
"""Returns value according to the provided data.
Args:
param: Constructor parameter value
env_variable: Environment variable related to the parameter
default: Constructor parameter default value
"""
if param is None:
res = env_variable or default
else:
res = param

return res
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,29 @@ def emit(self, batch: jaeger.Batch):


class Collector:
"""Submits collected spans to Thrift HTTP server.
"""Submits collected spans to Jaeger collector in jaeger.thrift
format over binary thrift protocol. This is recommend option in cases where
it is not feasible to deploy Jaeger Agent next to the application,
for example, when the application code is running as AWS Lambda function.
In these scenarios the Jaeger Clients can be configured to submit spans directly
to the Collectors over HTTP/HTTPS.
Args:
thrift_url: URL of the Jaeger HTTP Thrift.
thrift_url: Endpoint used to send spans
directly to Collector the over HTTP.
auth: Auth tuple that contains username and password for Basic Auth.
timeout_in_millis: timeout for THttpClient.
"""

def __init__(self, thrift_url="", auth=None):
def __init__(self, thrift_url="", auth=None, timeout_in_millis=None):
self.thrift_url = thrift_url
self.auth = auth
self.http_transport = THttpClient.THttpClient(
uri_or_host=self.thrift_url
)
if timeout_in_millis is not None:
self.http_transport.setTimeout(timeout_in_millis)

self.protocol = TBinaryProtocol.TBinaryProtocol(self.http_transport)

# set basic auth header
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
OTEL_EXPORTER_JAEGER_AGENT_PORT,
OTEL_EXPORTER_JAEGER_ENDPOINT,
OTEL_EXPORTER_JAEGER_PASSWORD,
OTEL_EXPORTER_JAEGER_TIMEOUT,
OTEL_EXPORTER_JAEGER_USER,
)
from opentelemetry.sdk.resources import SERVICE_NAME
Expand Down Expand Up @@ -148,6 +149,7 @@ def test_constructor_by_environment_variables(self):
OTEL_EXPORTER_JAEGER_ENDPOINT: collector_endpoint,
OTEL_EXPORTER_JAEGER_USER: username,
OTEL_EXPORTER_JAEGER_PASSWORD: password,
OTEL_EXPORTER_JAEGER_TIMEOUT: "20",
},
)

Expand All @@ -164,6 +166,7 @@ def test_constructor_by_environment_variables(self):
self.assertEqual(exporter.service_name, service)
self.assertEqual(exporter.agent_host_name, agent_host_name)
self.assertEqual(exporter.agent_port, int(agent_port))
self.assertEqual(exporter._timeout, 20)
self.assertTrue(exporter._collector_http_client is not None)
self.assertEqual(exporter.collector_endpoint, collector_endpoint)
self.assertEqual(exporter._collector_http_client.auth, auth)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,19 @@ def __init__(
else:
self.client = client

self.host_name = host_name
self.node = utils.get_node(service_name, host_name)

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
# Populate service_name from first span
# We restrict any SpanProcessor to be only associated with a single
# TracerProvider, so it is safe to assume that all Spans in a single
# batch all originate from one TracerProvider (and in turn have all
# the same service_name)
if spans:
service_name = spans[0].resource.attributes.get(SERVICE_NAME)
if service_name:
self.node = utils.get_node(service_name, self.host_name)
try:
responses = self.client.Export(self.generate_span_requests(spans))

Expand Down
Loading

0 comments on commit 64ae591

Please sign in to comment.