Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Auto populate -pks when not provided by user #1324

Merged
merged 22 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
00e2ca3
feat: Auto populate -pks from Oracle constraint
nj1973 Nov 6, 2024
6b188c4
feat: Auto populate -pks from Teradata constraint
nj1973 Nov 7, 2024
eb99fc9
feat: Auto populate -pks from PostgreSQL constraint
nj1973 Nov 7, 2024
a03d6c9
feat: Auto populate -pks from PostgreSQL constraint
nj1973 Nov 7, 2024
0e610b0
feat: Auto populate -pks from SQL Server, DB2 constraint
nj1973 Nov 8, 2024
b9d7a13
feat: Auto populate -pks no-op for BigQuery
nj1973 Nov 8, 2024
519078b
feat: Auto populate -pks no-op for Snowflake and MySQL
nj1973 Nov 11, 2024
8413def
Merge branch 'develop' into 1253-row-validation-primary-keys-auto
nj1973 Nov 12, 2024
ce886d8
tests: Fix DB2 test
nj1973 Nov 12, 2024
c0f3dc1
Merge branch 'develop' into 1253-row-validation-primary-keys-auto
nj1973 Nov 15, 2024
864cc28
Merge branch 'develop' into 1253-row-validation-primary-keys-auto
helensilva14 Nov 18, 2024
5df5246
Update third_party/ibis/ibis_biquery/api.py
nj1973 Nov 19, 2024
f445ef1
Update third_party/ibis/ibis_cloud_spanner/__init__.py
nj1973 Nov 19, 2024
92300b9
Update third_party/ibis/ibis_redshift/__init__.py
nj1973 Nov 19, 2024
ecfca15
Update data_validation/cli_tools.py
nj1973 Nov 19, 2024
2cd80a7
Update data_validation/config_manager.py
nj1973 Nov 19, 2024
f92301d
chore: Refactor PostgreSQL catalog query
nj1973 Nov 19, 2024
04dbc2e
chore: PR comment
nj1973 Nov 19, 2024
dd543d3
chore: PR comment
nj1973 Nov 19, 2024
45d232e
chore: PR comment changes
nj1973 Nov 20, 2024
5f4b4e4
docs: Doc updates for --primay-keys changes
nj1973 Nov 20, 2024
0be73b1
Merge branch 'develop' into 1253-row-validation-primary-keys-auto
nj1973 Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,16 @@ In addition, please note that SHA256 is not a supported function on Teradata sys
If you wish to perform this comparison on Teradata you will need to
[deploy a UDF to perform the conversion](https://github.com/akuroda/teradata-udf-sha2/blob/master/src/sha256.c).)

Below is the command syntax for row validations. In order to run row level
validations you need to pass a `--primary-key` flag which defines what field(s)
the validation will be compared on, as well as either the `--comparison-fields` flag
or the `--hash` flag. See *Primary Keys* section
Below is the command syntax for row validations. In order to run row level validations we require
unique columns to join row sets, which are either inferred from the source/target table or provided
via the `--primary-keys` flag, and either the `--hash`, `--concat` or `--comparison-fields` flags.
See *Primary Keys* section.

The `--comparison-fields` flag specifies the values (e.g. columns) whose raw values will be compared
based on the primary key join. The `--hash` flag will run a checksum across specified columns in
the table. This will include casting to string, sanitizing the data (ifnull, rtrim, upper), concatenating,
and finally hashing the row.


Under the hood, row validation uses
[Calculated Fields](https://github.com/GoogleCloudPlatform/professional-services-data-validator#calculated-fields) to
apply functions such as IFNULL() or RTRIM(). These can be edited in the YAML or JSON config file to customize your row validation.
Expand All @@ -188,13 +187,14 @@ data-validation
Comma separated list of tables in the form schema.table=target_schema.target_table
Target schema name and table name are optional.
i.e 'bigquery-public-data.new_york_citibike.citibike_trips'
--primary-keys or -pk PRIMARY_KEYS
Comma separated list of columns to use as primary keys. See *Primary Keys* section
--comparison-fields or -comp-fields FIELDS
Comma separated list of columns to compare. Can either be a physical column or an alias
See: *Calculated Fields* section for details
--hash COLUMNS Comma separated list of columns to hash or * for all columns
--concat COLUMNS Comma separated list of columns to concatenate or * for all columns (use if a common hash function is not available between databases)
[--primary-keys PRIMARY_KEYS, -pk PRIMARY_KEYS]
Comma separated list of primary key columns, when not specified the value will be inferred
from the source or target table if available. See *Primary Keys* section
[--exclude-columns or -ec]
Flag to indicate the list of columns provided should be excluded from hash or concat instead of included.
[--bq-result-handler or -bqrh PROJECT_ID.DATASET.TABLE]
Expand Down Expand Up @@ -262,8 +262,6 @@ data-validation
Either --tables-list or --source-query (or file) and --target-query (or file) must be provided
--target-query-file TARGET_QUERY_FILE, -tqf TARGET_QUERY_FILE
File containing the target sql command. Supports GCS and local paths.
--primary-keys PRIMARY_KEYS, -pk PRIMARY_KEYS
Comma separated list of primary key columns 'col_a,col_b'. See *Primary Keys* section
--comparison-fields or -comp-fields FIELDS
Comma separated list of columns to compare. Can either be a physical column or an alias
See: *Calculated Fields* section for details
Expand All @@ -277,6 +275,9 @@ data-validation
--partition-num INT, -pn INT
Number of partitions into which the table should be split, e.g. 1000 or 10000
In case this value exceeds the row count of the source/target table, it will be decreased to max(source_row_count, target_row_count)
[--primary-keys PRIMARY_KEYS, -pk PRIMARY_KEYS]
Comma separated list of primary key columns, when not specified the value will be inferred
from the source or target table if available. See *Primary Keys* section
[--bq-result-handler or -bqrh PROJECT_ID.DATASET.TABLE]
BigQuery destination for validation results. Defaults to stdout.
See: *Validation Reports* section
Expand Down Expand Up @@ -448,8 +449,8 @@ data-validation
--hash '*' '*' to hash all columns.
--concat COLUMNS Comma separated list of columns to concatenate or * for all columns
(use if a common hash function is not available between databases)
--primary-key or -pk JOIN_KEY
Common column between source and target tables for join
[--primary-keys PRIMARY_KEYS, -pk PRIMARY_KEYS]
Common column between source and target queries for join
[--exclude-columns or -ec]
Flag to indicate the list of columns provided should be excluded from hash or concat instead of included.
[--bq-result-handler or -bqrh PROJECT_ID.DATASET.TABLE]
Expand Down Expand Up @@ -679,6 +680,8 @@ In many cases, validations (e.g. count, min, max etc) produce one row per table.
and target table is to compare the value for each column in the source with the value of the column in the target.
`grouped-columns` validation and `validate row` produce multiple rows per table. Data Validation Tool needs one or more columns to uniquely identify each row so the source and target can be compared. Data Validation Tool refers to these columns as primary keys. These do not need to be primary keys in the table. The only requirement is that the keys uniquely identify the row in the results.

These columns are inferred, where possible, from the source/target table or can be provided via the `--primary-keys` flag.

### Grouped Columns

Grouped Columns contain the fields you want your aggregations to be broken out
Expand Down
29 changes: 17 additions & 12 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ def _get_calculated_config(args, config_manager: ConfigManager) -> List[dict]:
return calculated_configs


def _get_comparison_config(args, config_manager: ConfigManager) -> List[dict]:
def _get_comparison_config(
args, config_manager: ConfigManager, primary_keys: list
) -> List[dict]:
col_list = (
None
if args.comparison_fields == "*"
Expand All @@ -230,11 +232,7 @@ def _get_comparison_config(args, config_manager: ConfigManager) -> List[dict]:
args.exclude_columns,
)
# We can't have the PK columns in the comparison SQL twice therefore filter them out here if included.
comparison_fields = [
_
for _ in comparison_fields
if _ not in cli_tools.get_arg_list(args.primary_keys.casefold())
]
comparison_fields = [_ for _ in comparison_fields if _ not in primary_keys]

# As per #1190, add rstrip for Teradata string comparison fields
if (
Expand Down Expand Up @@ -314,18 +312,25 @@ def build_config_from_args(args: Namespace, config_manager: ConfigManager):
_get_calculated_config(args, config_manager)
)

# Append Comparison fields
if args.comparison_fields:
config_manager.append_comparison_fields(
_get_comparison_config(args, config_manager)
)

# Append primary_keys
primary_keys = cli_tools.get_arg_list(args.primary_keys)
if not primary_keys and config_manager.validation_type != consts.CUSTOM_QUERY:
primary_keys = config_manager.auto_list_primary_keys()
if not primary_keys:
raise ValueError(
"No primary keys were provided and neither the source or target tables have primary keys. Please include --primary-keys argument"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried a custom query without any primary keys and got the following error ValueError: No primary keys were provided and neither the source or target tables have primary keys. Please include --primary-keys argument. Is this sufficient or should we say Custom query validations must provide primary keys. No primary keys were provided and neither the source or target tables have primary keys. Please include --primary-keys argument

I am OK either way.

Sundar Mudupalli

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left it as it is, it's not totally accurate but makes it clear you need to provide the primary keys. Thanks for the good review, both you and Helen provided valuable direction for me.

)
primary_keys = [_.casefold() for _ in primary_keys]
config_manager.append_primary_keys(
config_manager.build_column_configs(primary_keys)
)

# Append Comparison fields
if args.comparison_fields:
config_manager.append_comparison_fields(
_get_comparison_config(args, config_manager, primary_keys)
)

return config_manager


Expand Down
35 changes: 20 additions & 15 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@
],
}

VALIDATE_HELP_TEXT = "Run a validation and optionally store to config"
VALIDATE_COLUMN_HELP_TEXT = "Run a column validation"
VALIDATE_ROW_HELP_TEXT = "Run a row validation"
VALIDATE_SCHEMA_HELP_TEXT = "Run a schema validation"
VALIDATE_CUSTOM_QUERY_HELP_TEXT = "Run a custom query validation"


def _check_custom_query_args(parser: argparse.ArgumentParser, parsed_args: Namespace):
# This is where we make additional checks if the arguments provided are what we expect
Expand Down Expand Up @@ -471,9 +477,7 @@ def _configure_database_specific_parsers(parser):

def _configure_validate_parser(subparsers):
"""Configure arguments to run validations."""
validate_parser = subparsers.add_parser(
"validate", help="Run a validation and optionally store to config"
)
validate_parser = subparsers.add_parser("validate", help=VALIDATE_HELP_TEXT)

validate_parser.add_argument(
"--dry-run",
Expand All @@ -485,22 +489,22 @@ def _configure_validate_parser(subparsers):
validate_subparsers = validate_parser.add_subparsers(dest="validate_cmd")

column_parser = validate_subparsers.add_parser(
"column", help="Run a column validation"
"column", help=VALIDATE_COLUMN_HELP_TEXT
)
_configure_column_parser(column_parser)

row_parser = validate_subparsers.add_parser("row", help="Run a row validation")
row_parser = validate_subparsers.add_parser("row", help=VALIDATE_ROW_HELP_TEXT)
optional_arguments = row_parser.add_argument_group("optional arguments")
required_arguments = row_parser.add_argument_group("required arguments")
_configure_row_parser(row_parser, optional_arguments, required_arguments)

schema_parser = validate_subparsers.add_parser(
"schema", help="Run a schema validation"
"schema", help=VALIDATE_SCHEMA_HELP_TEXT
)
_configure_schema_parser(schema_parser)

custom_query_parser = validate_subparsers.add_parser(
"custom-query", help="Run a custom query validation"
"custom-query", help=VALIDATE_CUSTOM_QUERY_HELP_TEXT
)
_configure_custom_query_parser(custom_query_parser)

Expand All @@ -514,6 +518,15 @@ def _configure_row_parser(
):
"""Configure arguments to run row level validations."""
# Group optional arguments
optional_arguments.add_argument(
"--primary-keys",
"-pk",
help=(
"Comma separated list of primary key columns 'col_a,col_b', "
"when not specified the value will be inferred from the source or target table if available"
),
)

optional_arguments.add_argument(
"--threshold",
"-th",
Expand Down Expand Up @@ -586,14 +599,6 @@ def _configure_row_parser(
help="Comma separated tables list in the form 'schema.table=target_schema.target_table'",
)

# Group required arguments
required_arguments.add_argument(
"--primary-keys",
"-pk",
required=True,
help="Comma separated list of primary key columns 'col_a,col_b'",
)

# Group for mutually exclusive required arguments. Either must be supplied
mutually_exclusive_arguments = required_arguments.add_mutually_exclusive_group(
required=True
Expand Down
17 changes: 17 additions & 0 deletions data_validation/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1177,3 +1177,20 @@ def build_comp_fields(self, col_list: list, exclude_cols: bool = False) -> dict:
)

return casefold_source_columns

def auto_list_primary_keys(self) -> list:
"""Returns a list of primary key columns based on the source/target table.

If neither source nor target systems have a primary key defined then [] is returned.
"""
assert (
self.validation_type != consts.CUSTOM_QUERY
), "Custom query validations should not be able to reach this method"
primary_keys = self.source_client.list_primary_key_columns(
self.source_schema, self.source_table
)
if not primary_keys:
primary_keys = self.target_client.list_primary_key_columns(
self.target_schema, self.target_table
)
return primary_keys or []
2 changes: 1 addition & 1 deletion tests/resources/snowflake_test_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
-- limitations under the License.

CREATE OR REPLACE TABLE PSO_DATA_VALIDATOR.PUBLIC.DVT_CORE_TYPES (
ID INT NOT NULL,
ID INT NOT NULL PRIMARY KEY,
COL_INT8 TINYINT,
COL_INT16 SMALLINT,
COL_INT32 INT,
Expand Down
2 changes: 1 addition & 1 deletion tests/system/data_sources/common_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def row_validation_test(
f"-tc={tc}",
f"-tbls={tables}",
f"--filters={filters}",
f"--primary-keys={primary_keys}",
f"--primary-keys={primary_keys}" if primary_keys else None,
"--filter-status=fail",
f"--comparison-fields={comp_fields}" if comp_fields else f"--hash={hash}",
"--use-random-row" if use_randow_row else None,
Expand Down
18 changes: 18 additions & 0 deletions tests/system/data_sources/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,24 @@ def test_row_validation_core_types(mock_conn):
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
return_value=BQ_CONN,
)
def test_row_validation_core_types_auto_pks(mock_conn):
"""Test auto population of -pks from BigQuery - expect an exception.

Expects:
ValueError: --primary-keys argument is required for this validation
"""
with pytest.raises(ValueError):
row_validation_test(
tc="mock-conn",
hash="col_int8,col_int16",
primary_keys=None,
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
return_value=BQ_CONN,
Expand Down
14 changes: 14 additions & 0 deletions tests/system/data_sources/test_db2.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,20 @@ def test_row_validation_core_types():
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_row_validation_core_types_auto_pks():
"""Test auto population of -pks from DB2 defined constraint."""
row_validation_test(
tables="db2inst1.dvt_core_types",
tc="mock-conn",
hash="col_string",
primary_keys=None,
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
Expand Down
13 changes: 13 additions & 0 deletions tests/system/data_sources/test_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,19 @@ def test_row_validation_core_types():
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_row_validation_core_types_auto_pks():
"""Test auto population of -pks from MySQL defined constraint."""
row_validation_test(
tc="mock-conn",
hash="col_int8,col_int16",
primary_keys=None,
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
Expand Down
13 changes: 13 additions & 0 deletions tests/system/data_sources/test_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,19 @@ def test_row_validation_core_types():
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_row_validation_core_types_auto_pks():
"""Test auto population of -pks from Oracle defined constraint."""
row_validation_test(
tc="mock-conn",
hash="col_int8,col_int16",
primary_keys=None,
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
Expand Down
16 changes: 15 additions & 1 deletion tests/system/data_sources/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,12 +682,26 @@ def test_row_validation_pg_types():
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_row_validation_core_types_auto_pks():
"""Test auto population of -pks from PostgreSQL defined constraint."""
row_validation_test(
tables="pso_data_validator.dvt_core_types",
tc="mock-conn",
hash="col_int8,col_int16",
primary_keys=None,
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_row_validation_comp_fields_pg_types():
"""PostgreSQL to PostgreSQL dvt_core_types row validation with --comp-fields"""
"""PostgreSQL to PostgreSQL dvt_pg_types row validation --comp-fields"""
row_validation_test(
tables="pso_data_validator.dvt_pg_types",
tc="mock-conn",
Expand Down
14 changes: 14 additions & 0 deletions tests/system/data_sources/test_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,20 @@ def test_row_validation_core_types():
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_row_validation_core_types_auto_pks():
"""Test auto population of -pks from Snowflake defined constraint."""
row_validation_test(
tables="PSO_DATA_VALIDATOR.PUBLIC.DVT_CORE_TYPES",
tc="mock-conn",
hash="col_int8,col_int16",
primary_keys=None,
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
Expand Down
Loading