diff --git a/CHANGELOG.md b/CHANGELOG.md index 82fd3a23fe..c5dfd9e601 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-system-metrics` Add `process.` prefix to `runtime.memory`, `runtime.cpu.time`, and `runtime.gc_count`. Change `runtime.memory` from count to UpDownCounter. ([#1735](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1735)) - Add request and response hooks for GRPC instrumentation (client only) ([#1706](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1706)) +- Fix memory leak in SQLAlchemy instrumentation where disposed `Engine` does not get garbage collected + ([#1771](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1771) - `opentelemetry-instrumentation-pymemcache` Update instrumentation to support pymemcache >4 ([#1764](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1764)) - `opentelemetry-instrumentation-confluent-kafka` Add support for higher versions of confluent_kafka diff --git a/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/engine.py b/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/engine.py index 9ff6057728..0e18bc9bed 100644 --- a/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/engine.py +++ b/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/engine.py @@ -13,6 +13,7 @@ # limitations under the License. import os import re +import weakref from sqlalchemy.event import ( # pylint: disable=no-name-in-module listen, @@ -99,11 +100,11 @@ def __init__( commenter_options=None, ): self.tracer = tracer - self.engine = engine self.connections_usage = connections_usage self.vendor = _normalize_vendor(engine.name) self.enable_commenter = enable_commenter self.commenter_options = commenter_options if commenter_options else {} + self._engine_attrs = _get_attributes_from_engine(engine) self._leading_comment_remover = re.compile(r"^/\*.*?\*/") self._register_event_listener( @@ -118,23 +119,11 @@ def __init__( self._register_event_listener(engine, "checkin", self._pool_checkin) self._register_event_listener(engine, "checkout", self._pool_checkout) - def _get_connection_string(self): - drivername = self.engine.url.drivername or "" - host = self.engine.url.host or "" - port = self.engine.url.port or "" - database = self.engine.url.database or "" - return f"{drivername}://{host}:{port}/{database}" - - def _get_pool_name(self): - if self.engine.pool.logging_name is not None: - return self.engine.pool.logging_name - return self._get_connection_string() - def _add_idle_to_connection_usage(self, value): self.connections_usage.add( value, attributes={ - "pool.name": self._get_pool_name(), + **self._engine_attrs, "state": "idle", }, ) @@ -143,7 +132,7 @@ def _add_used_to_connection_usage(self, value): self.connections_usage.add( value, attributes={ - "pool.name": self._get_pool_name(), + **self._engine_attrs, "state": "used", }, ) @@ -169,12 +158,21 @@ def _pool_checkout( @classmethod def _register_event_listener(cls, target, identifier, func, *args, **kw): listen(target, identifier, func, *args, **kw) - cls._remove_event_listener_params.append((target, identifier, func)) + cls._remove_event_listener_params.append( + (weakref.ref(target), identifier, func) + ) @classmethod def remove_all_event_listeners(cls): - for remove_params in cls._remove_event_listener_params: - remove(*remove_params) + for ( + weak_ref_target, + identifier, + func, + ) in cls._remove_event_listener_params: + # Remove an event listener only if saved weak reference points to an object + # which has not been garbage collected + if weak_ref_target() is not None: + remove(weak_ref_target(), identifier, func) cls._remove_event_listener_params.clear() def _operation_name(self, db_name, statement): @@ -300,3 +298,22 @@ def _get_attributes_from_cursor(vendor, cursor, attrs): if info.port: attrs[SpanAttributes.NET_PEER_PORT] = int(info.port) return attrs + + +def _get_connection_string(engine): + drivername = engine.url.drivername or "" + host = engine.url.host or "" + port = engine.url.port or "" + database = engine.url.database or "" + return f"{drivername}://{host}:{port}/{database}" + + +def _get_attributes_from_engine(engine): + """Set metadata attributes of the database engine""" + attrs = {} + + attrs["pool.name"] = getattr( + getattr(engine, "pool", None), "logging_name", None + ) or _get_connection_string(engine) + + return attrs diff --git a/instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlalchemy.py b/instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlalchemy.py index 981da107db..8f706ca8c8 100644 --- a/instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlalchemy.py +++ b/instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlalchemy.py @@ -307,3 +307,26 @@ def test_no_op_tracer_provider(self): cnx.execute("SELECT 1 + 1;").fetchall() spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 0) + + def test_no_memory_leakage_if_engine_diposed(self): + SQLAlchemyInstrumentor().instrument() + import gc + import weakref + + from sqlalchemy import create_engine + + callback = mock.Mock() + + def make_shortlived_engine(): + engine = create_engine("sqlite:///:memory:") + # Callback will be called if engine is deallocated during garbage + # collection + weakref.finalize(engine, callback) + with engine.connect() as conn: + conn.execute("SELECT 1 + 1;").fetchall() + + for _ in range(0, 5): + make_shortlived_engine() + + gc.collect() + assert callback.call_count == 5