Skip to content

Commit

Permalink
Merge branch 'main' into emptyroute
Browse files Browse the repository at this point in the history
  • Loading branch information
shalevr authored Jul 4, 2023
2 parents 9e6be51 + dadcd01 commit 974f806
Show file tree
Hide file tree
Showing 13 changed files with 225 additions and 36 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1870](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1870))
- Update falcon instrumentation to follow semantic conventions
([#1824](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1824))
- Fix sqlalchemy instrumentation wrap methods to accept sqlcommenter options([#1873](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1873))

### Added

Expand All @@ -36,12 +37,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1833](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1833))
- Fix elasticsearch `Transport.perform_request` instrument wrap for elasticsearch >= 8
([#1810](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1810))
- `opentelemetry-instrumentation-urllib3` Add support for urllib3 version 2
([#1879](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1879))

## Version 1.18.0/0.39b0 (2023-05-10)

- `opentelemetry-instrumentation-system-metrics` Add `process.` prefix to `runtime.memory`, `runtime.cpu.time`, and `runtime.gc_count`. Change `runtime.memory` from count to UpDownCounter. ([#1735](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1735))
- Add request and response hooks for GRPC instrumentation (client only)
([#1706](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1706))
- Fix memory leak in SQLAlchemy instrumentation where disposed `Engine` does not get garbage collected
([#1771](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1771)
- `opentelemetry-instrumentation-pymemcache` Update instrumentation to support pymemcache >4
([#1764](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1764))
- `opentelemetry-instrumentation-confluent-kafka` Add support for higher versions of confluent_kafka
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ depend on `opentelemetry-sdk` or another package that implements the API.
**Please note** that these libraries are currently in _beta_, and shouldn't
generally be used in production environments.

Unless explicitly stated otherwise, any instrumentation here for a particular library is not developed or maintained by the authors of such library.

The
[`instrumentation/`](https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation)
directory includes OpenTelemetry instrumentation packages, which can be installed
Expand Down
2 changes: 1 addition & 1 deletion instrumentation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@
| [opentelemetry-instrumentation-tornado](./opentelemetry-instrumentation-tornado) | tornado >= 5.1.1 | Yes
| [opentelemetry-instrumentation-tortoiseorm](./opentelemetry-instrumentation-tortoiseorm) | tortoise-orm >= 0.17.0 | No
| [opentelemetry-instrumentation-urllib](./opentelemetry-instrumentation-urllib) | urllib | Yes
| [opentelemetry-instrumentation-urllib3](./opentelemetry-instrumentation-urllib3) | urllib3 >= 1.0.0, < 2.0.0 | Yes
| [opentelemetry-instrumentation-urllib3](./opentelemetry-instrumentation-urllib3) | urllib3 >= 1.0.0, < 3.0.0 | Yes
| [opentelemetry-instrumentation-wsgi](./opentelemetry-instrumentation-wsgi) | wsgi | Yes
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ def _instrument(self, **kwargs):
``engine``: a SQLAlchemy engine instance
``engines``: a list of SQLAlchemy engine instances
``tracer_provider``: a TracerProvider, defaults to global
``meter_provider``: a MeterProvider, defaults to global
``enable_commenter``: bool to enable sqlcommenter, defaults to False
``commenter_options``: dict of sqlcommenter config, defaults to None
Returns:
An instrumented engine if passed in as an argument or list of instrumented engines, None otherwise.
Expand All @@ -151,16 +154,21 @@ def _instrument(self, **kwargs):
)

enable_commenter = kwargs.get("enable_commenter", False)
commenter_options = kwargs.get("commenter_options", {})

_w(
"sqlalchemy",
"create_engine",
_wrap_create_engine(tracer, connections_usage, enable_commenter),
_wrap_create_engine(
tracer, connections_usage, enable_commenter, commenter_options
),
)
_w(
"sqlalchemy.engine",
"create_engine",
_wrap_create_engine(tracer, connections_usage, enable_commenter),
_wrap_create_engine(
tracer, connections_usage, enable_commenter, commenter_options
),
)
_w(
"sqlalchemy.engine.base",
Expand All @@ -172,7 +180,10 @@ def _instrument(self, **kwargs):
"sqlalchemy.ext.asyncio",
"create_async_engine",
_wrap_create_async_engine(
tracer, connections_usage, enable_commenter
tracer,
connections_usage,
enable_commenter,
commenter_options,
),
)
if kwargs.get("engine") is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
import os
import re
import weakref

from sqlalchemy.event import ( # pylint: disable=no-name-in-module
listen,
Expand Down Expand Up @@ -42,7 +43,7 @@ def _normalize_vendor(vendor):


def _wrap_create_async_engine(
tracer, connections_usage, enable_commenter=False
tracer, connections_usage, enable_commenter=False, commenter_options=None
):
# pylint: disable=unused-argument
def _wrap_create_async_engine_internal(func, module, args, kwargs):
Expand All @@ -51,20 +52,32 @@ def _wrap_create_async_engine_internal(func, module, args, kwargs):
"""
engine = func(*args, **kwargs)
EngineTracer(
tracer, engine.sync_engine, connections_usage, enable_commenter
tracer,
engine.sync_engine,
connections_usage,
enable_commenter,
commenter_options,
)
return engine

return _wrap_create_async_engine_internal


def _wrap_create_engine(tracer, connections_usage, enable_commenter=False):
def _wrap_create_engine(
tracer, connections_usage, enable_commenter=False, commenter_options=None
):
def _wrap_create_engine_internal(func, _module, args, kwargs):
"""Trace the SQLAlchemy engine, creating an `EngineTracer`
object that will listen to SQLAlchemy events.
"""
engine = func(*args, **kwargs)
EngineTracer(tracer, engine, connections_usage, enable_commenter)
EngineTracer(
tracer,
engine,
connections_usage,
enable_commenter,
commenter_options,
)
return engine

return _wrap_create_engine_internal
Expand Down Expand Up @@ -99,11 +112,11 @@ def __init__(
commenter_options=None,
):
self.tracer = tracer
self.engine = engine
self.connections_usage = connections_usage
self.vendor = _normalize_vendor(engine.name)
self.enable_commenter = enable_commenter
self.commenter_options = commenter_options if commenter_options else {}
self._engine_attrs = _get_attributes_from_engine(engine)
self._leading_comment_remover = re.compile(r"^/\*.*?\*/")

self._register_event_listener(
Expand All @@ -118,23 +131,11 @@ def __init__(
self._register_event_listener(engine, "checkin", self._pool_checkin)
self._register_event_listener(engine, "checkout", self._pool_checkout)

def _get_connection_string(self):
drivername = self.engine.url.drivername or ""
host = self.engine.url.host or ""
port = self.engine.url.port or ""
database = self.engine.url.database or ""
return f"{drivername}://{host}:{port}/{database}"

def _get_pool_name(self):
if self.engine.pool.logging_name is not None:
return self.engine.pool.logging_name
return self._get_connection_string()

def _add_idle_to_connection_usage(self, value):
self.connections_usage.add(
value,
attributes={
"pool.name": self._get_pool_name(),
**self._engine_attrs,
"state": "idle",
},
)
Expand All @@ -143,7 +144,7 @@ def _add_used_to_connection_usage(self, value):
self.connections_usage.add(
value,
attributes={
"pool.name": self._get_pool_name(),
**self._engine_attrs,
"state": "used",
},
)
Expand All @@ -169,12 +170,21 @@ def _pool_checkout(
@classmethod
def _register_event_listener(cls, target, identifier, func, *args, **kw):
listen(target, identifier, func, *args, **kw)
cls._remove_event_listener_params.append((target, identifier, func))
cls._remove_event_listener_params.append(
(weakref.ref(target), identifier, func)
)

@classmethod
def remove_all_event_listeners(cls):
for remove_params in cls._remove_event_listener_params:
remove(*remove_params)
for (
weak_ref_target,
identifier,
func,
) in cls._remove_event_listener_params:
# Remove an event listener only if saved weak reference points to an object
# which has not been garbage collected
if weak_ref_target() is not None:
remove(weak_ref_target(), identifier, func)
cls._remove_event_listener_params.clear()

def _operation_name(self, db_name, statement):
Expand Down Expand Up @@ -300,3 +310,22 @@ def _get_attributes_from_cursor(vendor, cursor, attrs):
if info.port:
attrs[SpanAttributes.NET_PEER_PORT] = int(info.port)
return attrs


def _get_connection_string(engine):
drivername = engine.url.drivername or ""
host = engine.url.host or ""
port = engine.url.port or ""
database = engine.url.database or ""
return f"{drivername}://{host}:{port}/{database}"


def _get_attributes_from_engine(engine):
"""Set metadata attributes of the database engine"""
attrs = {}

attrs["pool.name"] = getattr(
getattr(engine, "pool", None), "logging_name", None
) or _get_connection_string(engine)

return attrs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
from unittest import mock

import pytest
Expand Down Expand Up @@ -176,6 +177,43 @@ def test_create_engine_wrapper(self):
"opentelemetry.instrumentation.sqlalchemy",
)

def test_create_engine_wrapper_enable_commenter(self):
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
SQLAlchemyInstrumentor().instrument(
enable_commenter=True,
commenter_options={"db_framework": False},
)
from sqlalchemy import create_engine # pylint: disable-all

engine = create_engine("sqlite:///:memory:")
cnx = engine.connect()
cnx.execute("SELECT 1;").fetchall()
# sqlcommenter
self.assertRegex(
self.caplog.records[-2].getMessage(),
r"SELECT 1 /\*db_driver='(.*)',traceparent='\d{1,2}-[a-zA-Z0-9_]{32}-[a-zA-Z0-9_]{16}-\d{1,2}'\*/;",
)

def test_create_engine_wrapper_enable_commenter_otel_values_false(self):
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
SQLAlchemyInstrumentor().instrument(
enable_commenter=True,
commenter_options={
"db_framework": False,
"opentelemetry_values": False,
},
)
from sqlalchemy import create_engine # pylint: disable-all

engine = create_engine("sqlite:///:memory:")
cnx = engine.connect()
cnx.execute("SELECT 1;").fetchall()
# sqlcommenter
self.assertRegex(
self.caplog.records[-2].getMessage(),
r"SELECT 1 /\*db_driver='(.*)'\*/;",
)

def test_custom_tracer_provider(self):
provider = TracerProvider(
resource=Resource.create(
Expand Down Expand Up @@ -242,6 +280,65 @@ async def run():

asyncio.get_event_loop().run_until_complete(run())

@pytest.mark.skipif(
not sqlalchemy.__version__.startswith("1.4"),
reason="only run async tests for 1.4",
)
def test_create_async_engine_wrapper_enable_commenter(self):
async def run():
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
SQLAlchemyInstrumentor().instrument(
enable_commenter=True,
commenter_options={
"db_framework": False,
},
)
from sqlalchemy.ext.asyncio import ( # pylint: disable-all
create_async_engine,
)

engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.connect() as cnx:
await cnx.execute(sqlalchemy.text("SELECT 1;"))
# sqlcommenter
self.assertRegex(
self.caplog.records[1].getMessage(),
r"SELECT 1 /\*db_driver='(.*)',traceparent='\d{1,2}-[a-zA-Z0-9_]{32}-[a-zA-Z0-9_]{16}-\d{1,2}'\*/;",
)

asyncio.get_event_loop().run_until_complete(run())

@pytest.mark.skipif(
not sqlalchemy.__version__.startswith("1.4"),
reason="only run async tests for 1.4",
)
def test_create_async_engine_wrapper_enable_commenter_otel_values_false(
self,
):
async def run():
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
SQLAlchemyInstrumentor().instrument(
enable_commenter=True,
commenter_options={
"db_framework": False,
"opentelemetry_values": False,
},
)
from sqlalchemy.ext.asyncio import ( # pylint: disable-all
create_async_engine,
)

engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.connect() as cnx:
await cnx.execute(sqlalchemy.text("SELECT 1;"))
# sqlcommenter
self.assertRegex(
self.caplog.records[1].getMessage(),
r"SELECT 1 /\*db_driver='(.*)'\*/;",
)

asyncio.get_event_loop().run_until_complete(run())

def test_uninstrument(self):
engine = create_engine("sqlite:///:memory:")
SQLAlchemyInstrumentor().instrument(
Expand Down Expand Up @@ -307,3 +404,26 @@ def test_no_op_tracer_provider(self):
cnx.execute("SELECT 1 + 1;").fetchall()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)

def test_no_memory_leakage_if_engine_diposed(self):
SQLAlchemyInstrumentor().instrument()
import gc
import weakref

from sqlalchemy import create_engine

callback = mock.Mock()

def make_shortlived_engine():
engine = create_engine("sqlite:///:memory:")
# Callback will be called if engine is deallocated during garbage
# collection
weakref.finalize(engine, callback)
with engine.connect() as conn:
conn.execute("SELECT 1 + 1;").fetchall()

for _ in range(0, 5):
make_shortlived_engine()

gc.collect()
assert callback.call_count == 5
Loading

0 comments on commit 974f806

Please sign in to comment.