Skip to content

Commit

Permalink
unit and integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
PatrickJin-db committed Dec 11, 2024
1 parent 5ab055f commit 2b96cfe
Show file tree
Hide file tree
Showing 2 changed files with 413 additions and 0 deletions.
256 changes: 256 additions & 0 deletions python/delta_sharing/tests/test_delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,262 @@ def test_load_table_changes(
assert isinstance(e, HTTPError)
assert error in str(e)

@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE)
@pytest.mark.parametrize(
"fragments,starting_version,ending_version,starting_timestamp,ending_timestamp,error,expected",
[
pytest.param(
"share8.default.cdf_table_cdf_enabled",
0,
3,
None,
None,
None,
pd.DataFrame(
{
"name": ["1", "2", "3", "3", "2", "2"],
"age": pd.Series([1, 2, 3, 3, 2, 2], dtype="int32"),
"birthday": [
date(2020, 1, 1),
date(2020, 1, 1),
date(2020, 1, 1),
date(2020, 1, 1),
date(2020, 1, 1),
date(2020, 2, 2),
],
"_change_type": [
"insert",
"insert",
"insert",
"delete",
"update_preimage",
"update_postimage",
],
"_commit_version": [1, 1, 1, 2, 3, 3],
"_commit_timestamp": [
pd.to_datetime(1651272635000, unit='ms'),
pd.to_datetime(1651272635000, unit='ms'),
pd.to_datetime(1651272635000, unit='ms'),
pd.to_datetime(1651272655000, unit='ms'),
pd.to_datetime(1651272660000, unit='ms'),
pd.to_datetime(1651272660000, unit='ms'),
],
}
),
id="cdf_table_cdf_enabled table changes:[0, 3]",
),
pytest.param(
"share8.default.cdf_table_cdf_enabled",
5,
None,
None,
None,
None,
pd.DataFrame(columns=
[
"name",
"age",
"birthday",
"_change_type",
"_commit_version",
"_commit_timestamp",
]
),
id="cdf_table_cdf_enabled table changes:[5, ]",
),
pytest.param(
"share8.default.cdf_table_cdf_enabled",
None,
None,
"2000-01-01T00:00:00Z",
None,
"Please use a timestamp greater",
pd.DataFrame({"not_used": []}),
id="cdf_table_cdf_enabled table changes with starting_timestamp",
),
pytest.param(
"share8.default.cdf_table_cdf_enabled",
0,
None,
None,
"2100-01-01T00:00:00Z",
"Please use a timestamp less",
pd.DataFrame({"not_used": []}),
id="cdf_table_cdf_enabled table changes with ending_timestamp",
),
pytest.param(
"share1.default.table1",
0,
1,
None,
None,
"cdf is not enabled on table share1.default.table1",
pd.DataFrame({"not_used": []}),
id="table1 table changes not supported",
),
],
)
def test_load_table_changes_kernel(
profile_path: str,
fragments: str,
starting_version: Optional[int],
ending_version: Optional[int],
starting_timestamp: Optional[str],
ending_timestamp: Optional[str],
error: Optional[str],
expected: pd.DataFrame
):
if error is None:
pdf = load_table_changes_as_pandas(
f"{profile_path}#{fragments}",
starting_version,
ending_version,
starting_timestamp,
ending_timestamp,
use_delta_format=True
)
pd.testing.assert_frame_equal(pdf, expected)
else:
try:
load_table_changes_as_pandas(
f"{profile_path}#{fragments}",
starting_version,
ending_version,
starting_timestamp,
ending_timestamp
)
assert False
except Exception as e:
assert isinstance(e, HTTPError)
assert error in str(e)

@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE)
@pytest.mark.parametrize(
"fragments,starting_version,ending_version,starting_timestamp,ending_timestamp,error,expected",
[
pytest.param(
"share8.default.cdf_table_with_partition",
1,
3,
None,
None,
None,
pd.DataFrame(
{
"name": ["1", "2", "3", "2", "2", "3"],
"age": pd.Series([1, 2, 3, 2, 2, 3], dtype="int32"),
"birthday": [
date(2020, 1, 1),
date(2020, 1, 1),
date(2020, 3, 3),
date(2020, 1, 1),
date(2020, 2, 2),
date(2020, 3, 3),
],
"_change_type": [
"insert",
"insert",
"insert",
"update_preimage",
"update_postimage",
"delete",
],
"_commit_version": [1, 1, 1, 2, 2, 3],
"_commit_timestamp": [
pd.to_datetime(1651614980000, unit='ms'),
pd.to_datetime(1651614980000, unit='ms'),
pd.to_datetime(1651614980000, unit='ms'),
pd.to_datetime(1651614986000, unit='ms'),
pd.to_datetime(1651614986000, unit='ms'),
pd.to_datetime(1651614994000, unit='ms'),
],
}
),
id="cdf_table_with_partition table changes:[0, 3]",
),
pytest.param(
"share8.default.cdf_table_cdf_enabled",
5,
None,
None,
None,
None,
pd.DataFrame(columns=
[
"name",
"age",
"birthday",
"_change_type",
"_commit_version",
"_commit_timestamp",
]
),
id="cdf_table_with_partition table changes:[5, ]",
),
pytest.param(
"share8.default.cdf_table_with_partition",
None,
None,
"2022-05-03T21:56:25Z",
"2022-05-03T21:56:30Z",
None,
pd.DataFrame(
{
"name": ["2", "2"],
"age": pd.Series([2, 2], dtype="int32"),
"birthday": [
date(2020, 1, 1),
date(2020, 2, 2),
],
"_change_type": [
"update_preimage",
"update_postimage",
],
"_commit_version": [2, 2],
"_commit_timestamp": [
pd.to_datetime(1651614986000, unit='ms'),
pd.to_datetime(1651614986000, unit='ms'),
],
}
),
id="cdf_table_with_partition table changes with starting_timestamp and ending_timestamp",
),
],
)
def test_load_table_changes_partition_kernel(
profile_path: str,
fragments: str,
starting_version: Optional[int],
ending_version: Optional[int],
starting_timestamp: Optional[str],
ending_timestamp: Optional[str],
error: Optional[str],
expected: pd.DataFrame
):
if error is None:
pdf = load_table_changes_as_pandas(
f"{profile_path}#{fragments}",
starting_version,
ending_version,
starting_timestamp,
ending_timestamp,
use_delta_format=True
)
pd.testing.assert_frame_equal(pdf, expected)
else:
try:
load_table_changes_as_pandas(
f"{profile_path}#{fragments}",
starting_version,
ending_version,
starting_timestamp,
ending_timestamp
)
assert False
except Exception as e:
assert isinstance(e, HTTPError)
assert error in str(e)


def test_parse_url():
def check_invalid_url(url: str):
Expand Down
Loading

0 comments on commit 2b96cfe

Please sign in to comment.