diff --git a/python/delta_sharing/tests/test_delta_sharing.py b/python/delta_sharing/tests/test_delta_sharing.py index 65cdc3300..a89de9a3f 100644 --- a/python/delta_sharing/tests/test_delta_sharing.py +++ b/python/delta_sharing/tests/test_delta_sharing.py @@ -1104,6 +1104,262 @@ def test_load_table_changes( assert isinstance(e, HTTPError) assert error in str(e) +@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) +@pytest.mark.parametrize( + "fragments,starting_version,ending_version,starting_timestamp,ending_timestamp,error,expected", + [ + pytest.param( + "share8.default.cdf_table_cdf_enabled", + 0, + 3, + None, + None, + None, + pd.DataFrame( + { + "name": ["1", "2", "3", "3", "2", "2"], + "age": pd.Series([1, 2, 3, 3, 2, 2], dtype="int32"), + "birthday": [ + date(2020, 1, 1), + date(2020, 1, 1), + date(2020, 1, 1), + date(2020, 1, 1), + date(2020, 1, 1), + date(2020, 2, 2), + ], + "_change_type": [ + "insert", + "insert", + "insert", + "delete", + "update_preimage", + "update_postimage", + ], + "_commit_version": [1, 1, 1, 2, 3, 3], + "_commit_timestamp": [ + pd.to_datetime(1651272635000, unit='ms'), + pd.to_datetime(1651272635000, unit='ms'), + pd.to_datetime(1651272635000, unit='ms'), + pd.to_datetime(1651272655000, unit='ms'), + pd.to_datetime(1651272660000, unit='ms'), + pd.to_datetime(1651272660000, unit='ms'), + ], + } + ), + id="cdf_table_cdf_enabled table changes:[0, 3]", + ), + pytest.param( + "share8.default.cdf_table_cdf_enabled", + 5, + None, + None, + None, + None, + pd.DataFrame(columns= + [ + "name", + "age", + "birthday", + "_change_type", + "_commit_version", + "_commit_timestamp", + ] + ), + id="cdf_table_cdf_enabled table changes:[5, ]", + ), + pytest.param( + "share8.default.cdf_table_cdf_enabled", + None, + None, + "2000-01-01T00:00:00Z", + None, + "Please use a timestamp greater", + pd.DataFrame({"not_used": []}), + id="cdf_table_cdf_enabled table changes with starting_timestamp", + ), + pytest.param( + "share8.default.cdf_table_cdf_enabled", + 0, + None, + None, + "2100-01-01T00:00:00Z", + "Please use a timestamp less", + pd.DataFrame({"not_used": []}), + id="cdf_table_cdf_enabled table changes with ending_timestamp", + ), + pytest.param( + "share1.default.table1", + 0, + 1, + None, + None, + "cdf is not enabled on table share1.default.table1", + pd.DataFrame({"not_used": []}), + id="table1 table changes not supported", + ), + ], +) +def test_load_table_changes_kernel( + profile_path: str, + fragments: str, + starting_version: Optional[int], + ending_version: Optional[int], + starting_timestamp: Optional[str], + ending_timestamp: Optional[str], + error: Optional[str], + expected: pd.DataFrame +): + if error is None: + pdf = load_table_changes_as_pandas( + f"{profile_path}#{fragments}", + starting_version, + ending_version, + starting_timestamp, + ending_timestamp, + use_delta_format=True + ) + pd.testing.assert_frame_equal(pdf, expected) + else: + try: + load_table_changes_as_pandas( + f"{profile_path}#{fragments}", + starting_version, + ending_version, + starting_timestamp, + ending_timestamp + ) + assert False + except Exception as e: + assert isinstance(e, HTTPError) + assert error in str(e) + +@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) +@pytest.mark.parametrize( + "fragments,starting_version,ending_version,starting_timestamp,ending_timestamp,error,expected", + [ + pytest.param( + "share8.default.cdf_table_with_partition", + 1, + 3, + None, + None, + None, + pd.DataFrame( + { + "name": ["1", "2", "3", "2", "2", "3"], + "age": pd.Series([1, 2, 3, 2, 2, 3], dtype="int32"), + "birthday": [ + date(2020, 1, 1), + date(2020, 1, 1), + date(2020, 3, 3), + date(2020, 1, 1), + date(2020, 2, 2), + date(2020, 3, 3), + ], + "_change_type": [ + "insert", + "insert", + "insert", + "update_preimage", + "update_postimage", + "delete", + ], + "_commit_version": [1, 1, 1, 2, 2, 3], + "_commit_timestamp": [ + pd.to_datetime(1651614980000, unit='ms'), + pd.to_datetime(1651614980000, unit='ms'), + pd.to_datetime(1651614980000, unit='ms'), + pd.to_datetime(1651614986000, unit='ms'), + pd.to_datetime(1651614986000, unit='ms'), + pd.to_datetime(1651614994000, unit='ms'), + ], + } + ), + id="cdf_table_with_partition table changes:[0, 3]", + ), + pytest.param( + "share8.default.cdf_table_cdf_enabled", + 5, + None, + None, + None, + None, + pd.DataFrame(columns= + [ + "name", + "age", + "birthday", + "_change_type", + "_commit_version", + "_commit_timestamp", + ] + ), + id="cdf_table_with_partition table changes:[5, ]", + ), + pytest.param( + "share8.default.cdf_table_with_partition", + None, + None, + "2022-05-03T21:56:25Z", + "2022-05-03T21:56:30Z", + None, + pd.DataFrame( + { + "name": ["2", "2"], + "age": pd.Series([2, 2], dtype="int32"), + "birthday": [ + date(2020, 1, 1), + date(2020, 2, 2), + ], + "_change_type": [ + "update_preimage", + "update_postimage", + ], + "_commit_version": [2, 2], + "_commit_timestamp": [ + pd.to_datetime(1651614986000, unit='ms'), + pd.to_datetime(1651614986000, unit='ms'), + ], + } + ), + id="cdf_table_with_partition table changes with starting_timestamp and ending_timestamp", + ), + ], +) +def test_load_table_changes_partition_kernel( + profile_path: str, + fragments: str, + starting_version: Optional[int], + ending_version: Optional[int], + starting_timestamp: Optional[str], + ending_timestamp: Optional[str], + error: Optional[str], + expected: pd.DataFrame +): + if error is None: + pdf = load_table_changes_as_pandas( + f"{profile_path}#{fragments}", + starting_version, + ending_version, + starting_timestamp, + ending_timestamp, + use_delta_format=True + ) + pd.testing.assert_frame_equal(pdf, expected) + else: + try: + load_table_changes_as_pandas( + f"{profile_path}#{fragments}", + starting_version, + ending_version, + starting_timestamp, + ending_timestamp + ) + assert False + except Exception as e: + assert isinstance(e, HTTPError) + assert error in str(e) + def test_parse_url(): def check_invalid_url(url: str): diff --git a/python/delta_sharing/tests/test_reader.py b/python/delta_sharing/tests/test_reader.py index bf00e0c3b..335a800bd 100644 --- a/python/delta_sharing/tests/test_reader.py +++ b/python/delta_sharing/tests/test_reader.py @@ -505,3 +505,160 @@ def list_table_changes( assert pdf.columns.values[2] == DeltaSharingReader._change_type_col_name() assert pdf.columns.values[3] == DeltaSharingReader._commit_version_col_name() assert pdf.columns.values[4] == DeltaSharingReader._commit_timestamp_col_name() + +def test_table_changes_to_pandas_non_partitioned_delta(tmp_path): + # Create basic data frame. + pdf1 = pd.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]}) + pdf2 = pd.DataFrame({"a": [4, 5, 6], "b": ["d", "e", "f"]}) + pdf3 = pd.DataFrame({"a": [7, 8, 9], "b": ["x", "y", "z"]}) + pdf4 = pd.DataFrame({"a": [7, 8, 9], "b": ["x", "y", "z"]}) + + # Add change type (which is present in the parquet files). + pdf3[DeltaSharingReader._change_type_col_name()] = "update_preimage" + pdf4[DeltaSharingReader._change_type_col_name()] = "update_postimage" + + # Save. + pdf1.to_parquet(tmp_path / "pdf1.parquet") + pdf2.to_parquet(tmp_path / "pdf2.parquet") + pdf3.to_parquet(tmp_path / "pdf3.parquet") + pdf4.to_parquet(tmp_path / "pdf4.parquet") + + # Version and timestamp are not in the parquet files; but are expected by the conversion. + timestamp1 = 1652110000000 + timestamp2 = 1652120000000 + timestamp3 = 1652130000000 + timestamp4 = 1652140000000 + + version1 = 1 + version2 = 2 + version3 = 3 + version4 = 4 + + # The change type is also expected for add/remove actions. + pdf1[DeltaSharingReader._change_type_col_name()] = "insert" + pdf2[DeltaSharingReader._change_type_col_name()] = "delete" + + pdf1[DeltaSharingReader._commit_version_col_name()] = version1 + pdf2[DeltaSharingReader._commit_version_col_name()] = version2 + pdf3[DeltaSharingReader._commit_version_col_name()] = version3 + pdf4[DeltaSharingReader._commit_version_col_name()] = version4 + + pdf1[DeltaSharingReader._commit_timestamp_col_name()] = timestamp1 + pdf2[DeltaSharingReader._commit_timestamp_col_name()] = timestamp2 + pdf3[DeltaSharingReader._commit_timestamp_col_name()] = timestamp3 + pdf4[DeltaSharingReader._commit_timestamp_col_name()] = timestamp4 + + class RestClientMock: + def list_table_changes( + self, table: Table, cdfOptions: CdfOptions + ) -> ListTableChangesResponse: + assert table == Table("table_name", "share_name", "schema_name") + + schema_string = ( + '{"fields":[' + '{"metadata":{},"name":"a","nullable":true,"type":"long"},' + '{"metadata":{},"name":"b","nullable":true,"type":"string"}' + '],"type":"struct"}' + ).replace('"',r'\"') + lines = [ + f'''{{ + "protocol": {{ + "deltaProtocol": {{ + "minReaderVersion": 2, + "minWriterVersion": 6 + }} + }} + }}''', + f'''{{ + "metaData":{{ + "version":{version1}, + "deltaMetadata":{{ + "id":"some-table-id", + "format": {{ + "provider": "parquet", + "options": {{}} + }}, + "schemaString":"{schema_string}", + "partitionColumns":[], + "configuration":{{ + "delta.enableChangeDataFeed": "true" + }} + }} + }} + }}''', + f'''{{ + "file":{{ + "id":"pdf1", + "version":{version1}, + "timestamp":{timestamp1}, + "deltaSingleAction":{{ + "add":{{ + "path":"{str(tmp_path / "pdf1.parquet")}", + "partitionValues":{{}}, + "modificationTime":{timestamp1}, + "dataChange": true, + "size":0 + }} + }} + }} + }}''', + f'''{{ + "file":{{ + "id":"pdf2", + "version":{version2}, + "timestamp":{timestamp2}, + "deltaSingleAction":{{ + "remove":{{ + "path":"{str(tmp_path / "pdf2.parquet")}", + "partitionValues":{{}}, + "dataChange": true, + "size":0 + }} + }} + }} + }}''', + f'''{{ + "file":{{ + "id":"pdf3", + "version":{version3}, + "timestamp":{timestamp3}, + "deltaSingleAction":{{ + "cdc":{{ + "path":"{str(tmp_path / "pdf3.parquet")}", + "partitionValues":{{}}, + "dataChange": false, + "size":0 + }} + }} + }} + }}''', + f'''{{ + "file":{{ + "id":"pdf4", + "version":{version4}, + "timestamp":{timestamp4}, + "deltaSingleAction":{{ + "cdc":{{ + "path":"{str(tmp_path / "pdf4.parquet")}", + "partitionValues":{{}}, + "dataChange": false, + "size":0 + }} + }} + }} + }}''' + ] + return ListTableChangesResponse(protocol=None, metadata=None, actions=None, lines=lines) + + def set_delta_format_header(self): + return + + def remove_delta_format_header(self): + return + + reader = DeltaSharingReader(Table("table_name", "share_name", "schema_name"), RestClientMock(), use_delta_format=True) + pdf = reader.table_changes_to_pandas(CdfOptions()) + + expected = pd.concat([pdf1, pdf2, pdf3, pdf4]).reset_index(drop=True) + pd.testing.assert_frame_equal(pdf, expected) +