Skip to content

Commit

Permalink
feat(taps): Add checks for primary keys, replication keys and state p…
Browse files Browse the repository at this point in the history
…artitioning keys to standard tap tests (#829)

* Add code validation tests to standard tests

* Add extra test for replication keys

* Apply suggestions from code review

Co-authored-by: Edgar R. M. <edgarrm358@gmail.com>

* Fix formatting for ci check

Co-authored-by: Edgar R. M. <edgarrm358@gmail.com>
  • Loading branch information
laurentS and edgarrmondragon authored Sep 1, 2022
1 parent 16751ee commit 76eed0d
Showing 1 changed file with 46 additions and 1 deletion.
47 changes: 46 additions & 1 deletion singer_sdk/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,52 @@ def _test_stream_connections() -> None:
tap1: Tap = tap_class(config=config, parse_env_config=True)
tap1.run_connection_test()

return [_test_cli_prints, _test_discovery, _test_stream_connections]
def _test_pkeys_in_schema() -> None:
"""Verify that primary keys are actually in the stream's schema."""
tap = tap_class(config=config, parse_env_config=True)
for name, stream in tap.streams.items():
pkeys = stream.primary_keys or []
schema_props = set(stream.schema["properties"].keys())
for pkey in pkeys:
error_message = (
f"Coding error in stream '{name}': "
f"primary_key '{pkey}' is missing in schema"
)
assert pkey in schema_props, error_message

def _test_state_partitioning_keys_in_schema() -> None:
"""Verify that state partitioning keys are actually in the stream's schema."""
tap = tap_class(config=config, parse_env_config=True)
for name, stream in tap.streams.items():
sp_keys = stream.state_partitioning_keys or []
schema_props = set(stream.schema["properties"].keys())
for sp_key in sp_keys:
assert sp_key in schema_props, (
f"Coding error in stream '{name}': state_partitioning_key "
f"'{sp_key}' is missing in schema"
)

def _test_replication_keys_in_schema() -> None:
"""Verify that the replication key is actually in the stream's schema."""
tap = tap_class(config=config, parse_env_config=True)
for name, stream in tap.streams.items():
rep_key = stream.replication_key
if rep_key is None:
continue
schema_props = set(stream.schema["properties"].keys())
assert rep_key in schema_props, (
f"Coding error in stream '{name}': replication_key "
f"'{rep_key}' is missing in schema"
)

return [
_test_cli_prints,
_test_discovery,
_test_stream_connections,
_test_pkeys_in_schema,
_test_state_partitioning_keys_in_schema,
_test_replication_keys_in_schema,
]


def get_standard_target_tests(
Expand Down

0 comments on commit 76eed0d

Please sign in to comment.