From ac900d275862bc22dc19b40c9e5f9db71687f790 Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Tue, 4 Oct 2022 11:03:08 +0100 Subject: [PATCH 1/6] support stream property selection in table query --- singer_sdk/streams/sql.py | 67 +++++++++++++++++++++++---------------- 1 file changed, 40 insertions(+), 27 deletions(-) diff --git a/singer_sdk/streams/sql.py b/singer_sdk/streams/sql.py index 114c01674..c640c78d7 100644 --- a/singer_sdk/streams/sql.py +++ b/singer_sdk/streams/sql.py @@ -6,12 +6,13 @@ import logging from datetime import datetime from functools import lru_cache -from typing import Any, Iterable, cast +from typing import Any, Iterable, List, cast import sqlalchemy from sqlalchemy.engine import Engine from sqlalchemy.engine.reflection import Inspector +import singer_sdk.helpers._catalog as catalog from singer_sdk import typing as th from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema from singer_sdk.exceptions import ConfigValidationError @@ -325,11 +326,7 @@ def get_object_names( # Some DB providers do not understand 'views' self._warn_no_view_detection() view_names = [] - object_names = [(t, False) for t in table_names] + [ - (v, True) for v in view_names - ] - - return object_names + return [(t, False) for t in table_names] + [(v, True) for v in view_names] # TODO maybe should be splitted into smaller parts? def discover_catalog_entry( @@ -365,9 +362,13 @@ def discover_catalog_entry( pk_def = inspected.get_pk_constraint(table_name, schema=schema_name) if pk_def and "constrained_columns" in pk_def: possible_primary_keys.append(pk_def["constrained_columns"]) - for index_def in inspected.get_indexes(table_name, schema=schema_name): - if index_def.get("unique", False): - possible_primary_keys.append(index_def["column_names"]) + + possible_primary_keys.extend( + index_def["column_names"] + for index_def in inspected.get_indexes(table_name, schema=schema_name) + if index_def.get("unique", False) + ) + key_properties = next(iter(possible_primary_keys), None) # Initialize columns list @@ -397,7 +398,7 @@ def discover_catalog_entry( replication_method = next(reversed(["FULL_TABLE"] + addl_replication_methods)) # Create the catalog entry object - catalog_entry = CatalogEntry( + return CatalogEntry( tap_stream_id=unique_stream_id, stream=unique_stream_id, table=table_name, @@ -418,8 +419,6 @@ def discover_catalog_entry( replication_key=None, # Must be defined by user ) - return catalog_entry - def discover_catalog_entries(self) -> list[dict]: """Return a list of catalog entries from discovery. @@ -488,7 +487,9 @@ def table_exists(self, full_table_name: str) -> bool: sqlalchemy.inspect(self._engine).has_table(full_table_name), ) - def get_table_columns(self, full_table_name: str) -> dict[str, sqlalchemy.Column]: + def get_table_columns( + self, full_table_name: str, column_names: List[str] | None = None + ) -> dict[str, sqlalchemy.Column]: """Return a list of table columns. Args: @@ -501,17 +502,19 @@ def get_table_columns(self, full_table_name: str) -> dict[str, sqlalchemy.Column inspector = sqlalchemy.inspect(self._engine) columns = inspector.get_columns(table_name, schema_name) - result: dict[str, sqlalchemy.Column] = {} - for col_meta in columns: - result[col_meta["name"]] = sqlalchemy.Column( + return { + col_meta["name"]: sqlalchemy.Column( col_meta["name"], col_meta["type"], nullable=col_meta.get("nullable", False), ) + for col_meta in columns + if not column_names or col_meta["name"] in column_names + } - return result - - def get_table(self, full_table_name: str) -> sqlalchemy.Table: + def get_table( + self, full_table_name: str, column_names: List[str] | None = None + ) -> sqlalchemy.Table: """Return a table object. Args: @@ -520,7 +523,9 @@ def get_table(self, full_table_name: str) -> sqlalchemy.Table: Returns: A table object with column list. """ - columns = self.get_table_columns(full_table_name).values() + columns = self.get_table_columns( + full_table_name=full_table_name, column_names=column_names + ).values() _, schema_name, table_name = self.parse_full_table_name(full_table_name) meta = sqlalchemy.MetaData() return sqlalchemy.schema.Table( @@ -910,11 +915,7 @@ def __init__( connector: Optional connector to reuse. """ self._connector: SQLConnector - if connector: - self._connector = connector - else: - self._connector = self.connector_class(dict(tap.config)) - + self._connector = connector or self.connector_class(dict(tap.config)) self.catalog_entry = catalog_entry super().__init__( tap=tap, @@ -1016,8 +1017,16 @@ def fully_qualified_name(self) -> str: db_name=catalog_entry.database, ) - # Get records from stream + def get_selected_schema(self) -> dict: + """Return a copy of the Streams JSON schema, dropping any fields not selected.""" + return catalog.get_selected_schema( + stream_name=self.name, + schema=self.schema, + mask=self.mask, + logger=self.logger, + ) + # Get records from stream def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]: """Return a generator of record-type dictionary objects. @@ -1041,7 +1050,11 @@ def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]: f"Stream '{self.name}' does not support partitioning." ) - table = self.connector.get_table(self.fully_qualified_name) + selected_column_names = self.get_selected_schema()["properties"].keys() + table = self.connector.get_table( + full_table_name=self.fully_qualified_name, + column_names=selected_column_names, + ) query = table.select() if self.replication_key: replication_key_col = table.columns[self.replication_key] From 5362835bcd26c0233dcb6aebf76e9f74c070c07d Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Tue, 4 Oct 2022 11:09:19 +0100 Subject: [PATCH 2/6] added docstrings --- singer_sdk/streams/sql.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/singer_sdk/streams/sql.py b/singer_sdk/streams/sql.py index c640c78d7..96b033876 100644 --- a/singer_sdk/streams/sql.py +++ b/singer_sdk/streams/sql.py @@ -494,6 +494,7 @@ def get_table_columns( Args: full_table_name: Fully qualified table name. + column_names: A list of column names to filter to. Returns: An ordered list of column objects. @@ -519,6 +520,7 @@ def get_table( Args: full_table_name: Fully qualified table name. + column_names: A list of column names to filter to. Returns: A table object with column list. @@ -1018,7 +1020,13 @@ def fully_qualified_name(self) -> str: ) def get_selected_schema(self) -> dict: - """Return a copy of the Streams JSON schema, dropping any fields not selected.""" + """Return a copy of the Stream JSON schema, dropping any fields not selected. + + Returns: + + A dictionary containing a copy of the Stream JSON schema, filtered + to any selection criteria. + """ return catalog.get_selected_schema( stream_name=self.name, schema=self.schema, From 8bc493e066c2bcb2d6b4f76236e74a5135d1ddbe Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Tue, 4 Oct 2022 11:31:06 +0100 Subject: [PATCH 3/6] more linting --- singer_sdk/streams/sql.py | 1 - 1 file changed, 1 deletion(-) diff --git a/singer_sdk/streams/sql.py b/singer_sdk/streams/sql.py index 96b033876..389914aeb 100644 --- a/singer_sdk/streams/sql.py +++ b/singer_sdk/streams/sql.py @@ -1023,7 +1023,6 @@ def get_selected_schema(self) -> dict: """Return a copy of the Stream JSON schema, dropping any fields not selected. Returns: - A dictionary containing a copy of the Stream JSON schema, filtered to any selection criteria. """ From 533a51029e234705046f73249795e98ea63c05f1 Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Tue, 4 Oct 2022 11:36:25 +0100 Subject: [PATCH 4/6] even more linting --- singer_sdk/streams/sql.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/singer_sdk/streams/sql.py b/singer_sdk/streams/sql.py index 389914aeb..fa3f8a2f7 100644 --- a/singer_sdk/streams/sql.py +++ b/singer_sdk/streams/sql.py @@ -488,7 +488,7 @@ def table_exists(self, full_table_name: str) -> bool: ) def get_table_columns( - self, full_table_name: str, column_names: List[str] | None = None + self, full_table_name: str, column_names: list[str] | None = None ) -> dict[str, sqlalchemy.Column]: """Return a list of table columns. @@ -514,7 +514,7 @@ def get_table_columns( } def get_table( - self, full_table_name: str, column_names: List[str] | None = None + self, full_table_name: str, column_names: list[str] | None = None ) -> sqlalchemy.Table: """Return a table object. From 0a88df4bf8857053f507814d3b126544d5a4d4c4 Mon Sep 17 00:00:00 2001 From: "Edgar R. M" Date: Tue, 4 Oct 2022 12:03:52 -0500 Subject: [PATCH 5/6] Update singer_sdk/streams/sql.py --- singer_sdk/streams/sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singer_sdk/streams/sql.py b/singer_sdk/streams/sql.py index fa3f8a2f7..4457e4e23 100644 --- a/singer_sdk/streams/sql.py +++ b/singer_sdk/streams/sql.py @@ -6,7 +6,7 @@ import logging from datetime import datetime from functools import lru_cache -from typing import Any, Iterable, List, cast +from typing import Any, Iterable, cast import sqlalchemy from sqlalchemy.engine import Engine From dc0a3a48f1e4a461c90bb7a357f3d5c6ec3d0501 Mon Sep 17 00:00:00 2001 From: Ken Payne Date: Wed, 5 Oct 2022 15:41:39 +0100 Subject: [PATCH 6/6] implement case insensitive comparison --- singer_sdk/streams/sql.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/singer_sdk/streams/sql.py b/singer_sdk/streams/sql.py index 4457e4e23..6a0392485 100644 --- a/singer_sdk/streams/sql.py +++ b/singer_sdk/streams/sql.py @@ -510,7 +510,8 @@ def get_table_columns( nullable=col_meta.get("nullable", False), ) for col_meta in columns - if not column_names or col_meta["name"] in column_names + if not column_names + or col_meta["name"].casefold() in {col.casefold() for col in column_names} } def get_table(