Skip to content

Commit

Permalink
python connector CDF reads
Browse files Browse the repository at this point in the history
  • Loading branch information
PatrickJin-db committed Dec 18, 2024
1 parent ac1659c commit 22b77cc
Show file tree
Hide file tree
Showing 9 changed files with 881 additions and 53 deletions.
11 changes: 10 additions & 1 deletion .github/workflows/build-kernel-wheels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
os: [ubuntu-latest, ubuntu-20.04, macos-latest, windows-latest]
python-version: [3.8]
arch: [x86_64, arm64]
include:
Expand All @@ -25,6 +25,8 @@ jobs:
arch: arm64
- os: ubuntu-latest
arch: x86_64
- os: ubuntu-20.04
arch: x86_64
- os: windows-latest
arch: x86_64

Expand Down Expand Up @@ -69,6 +71,13 @@ jobs:
maturin build --release
shell: bash

- name: Build wheel (x86_64 Linux Ubuntu 20.04)
if: matrix.os == 'ubuntu-20.04'
run: |
cd python/delta-kernel-rust-sharing-wrapper
maturin build --release
shell: bash

- name: Upload wheels
uses: actions/upload-artifact@v4
with:
Expand Down
7 changes: 6 additions & 1 deletion python/delta_sharing/delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ def load_table_changes_as_pandas(
starting_version: Optional[int] = None,
ending_version: Optional[int] = None,
starting_timestamp: Optional[str] = None,
ending_timestamp: Optional[str] = None
ending_timestamp: Optional[str] = None,
use_delta_format: Optional[bool] = None
) -> pd.DataFrame:
"""
Load the table changes of shared table as a pandas DataFrame using the given url.
Expand All @@ -235,11 +236,15 @@ def load_table_changes_as_pandas(
return DeltaSharingReader(
table=Table(name=table, share=share, schema=schema),
rest_client=DataSharingRestClient(profile),
use_delta_format=use_delta_format
).table_changes_to_pandas(CdfOptions(
starting_version=starting_version,
ending_version=ending_version,
starting_timestamp=starting_timestamp,
ending_timestamp=ending_timestamp,
# when using delta format, we need to get metadata changes and
# handle them properly when replaying the delta log
include_historical_metadata=use_delta_format
))


Expand Down
206 changes: 206 additions & 0 deletions python/delta_sharing/fake_checkpoint.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions python/delta_sharing/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,4 @@ class CdfOptions:
ending_version: Optional[int] = None
starting_timestamp: Optional[str] = None
ending_timestamp: Optional[str] = None
include_historical_metadata: Optional[bool] = None
215 changes: 180 additions & 35 deletions python/delta_sharing/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any, Callable, Dict, Optional, Sequence
from collections import defaultdict
from typing import Any, Callable, Dict, List, Optional, Sequence
from urllib.parse import urlparse
from json import loads, dump
from urllib.request import getproxies
Expand All @@ -29,6 +30,7 @@
from delta_sharing.converter import to_converters, get_empty_table
from delta_sharing.protocol import AddCdcFile, CdfOptions, FileAction, Table
from delta_sharing.rest_client import DataSharingRestClient
from delta_sharing.fake_checkpoint import get_fake_checkpoint_byte_array


class DeltaSharingReader:
Expand Down Expand Up @@ -112,44 +114,11 @@ def __to_pandas_kernel(self):
)

lines = response.lines

# Create a temporary directory using the tempfile module
temp_dir = tempfile.TemporaryDirectory()
delta_log_dir_name = temp_dir.name
table_path = "file:///" + delta_log_dir_name

# Create a new directory named '_delta_log' within the temporary directory
log_dir = os.path.join(delta_log_dir_name, '_delta_log')
os.makedirs(log_dir)

# Create a new .json file within the '_delta_log' directory
json_file_name = "0".zfill(20) + ".json"
json_file_path = os.path.join(log_dir, json_file_name)
json_file = open(json_file_path, 'w+')

# Write the protocol action to the log file
protocol_json = loads(lines.pop(0))
deltaProtocol = {"protocol": protocol_json["protocol"]["deltaProtocol"]}
dump(deltaProtocol, json_file)
json_file.write("\n")

# Write the metadata action to the log file
metadata_json = loads(lines.pop(0))
deltaMetadata = {"metaData": metadata_json["metaData"]["deltaMetadata"]}
dump(deltaMetadata, json_file)
json_file.write("\n")

table_path = self.__write_temp_delta_log_snapshot(temp_dir.name, lines)
num_files = len(lines)

# Write the add file actions to the log file
for line in lines:
line_json = loads(line)
dump(line_json["file"]["deltaSingleAction"], json_file)
json_file.write("\n")

# Close the file
json_file.close()

# Invoke delta-kernel-rust to return the pandas dataframe
interface = delta_kernel_rust_sharing_wrapper.PythonInterface(table_path)
table = delta_kernel_rust_sharing_wrapper.Table(table_path)
Expand Down Expand Up @@ -234,7 +203,183 @@ def to_pandas(self) -> pd.DataFrame:

return merged[[col_map[field["name"].lower()] for field in schema_json["fields"]]]

def __write_temp_delta_log_snapshot(self, temp_dir: str, lines: List[str]) -> str:
delta_log_dir_name = temp_dir
table_path = "file:///" + delta_log_dir_name

# Create a new directory named '_delta_log' within the temporary directory
log_dir = os.path.join(delta_log_dir_name, '_delta_log')
os.makedirs(log_dir)

# Create a new .json file within the '_delta_log' directory
json_file_name = "0".zfill(20) + ".json"
json_file_path = os.path.join(log_dir, json_file_name)
json_file = open(json_file_path, 'w+')

# Write the protocol action to the log file
protocol_json = loads(lines.pop(0))
deltaProtocol = {"protocol": protocol_json["protocol"]["deltaProtocol"]}
dump(deltaProtocol, json_file)
json_file.write("\n")

# Write the metadata action to the log file
metadata_json = loads(lines.pop(0))
deltaMetadata = {"metaData": metadata_json["metaData"]["deltaMetadata"]}
dump(deltaMetadata, json_file)
json_file.write("\n")

# Write the add file actions to the log file
for line in lines:
line_json = loads(line)
dump(line_json["file"]["deltaSingleAction"], json_file)
json_file.write("\n")

# Close the file
json_file.close()
return table_path

def __write_temp_delta_log_cdf(
self,
log_dir: str,
delta_protocol: dict,
min_version: int,
max_version: int,
version_to_metadata: Dict[int, Any],
version_to_actions: Dict[int, Any],
version_to_timestamp: Dict[int, int]
):
min_version_file_name = str(min_version).zfill(20) + ".json"
min_version_path = os.path.join(log_dir, min_version_file_name)
with open(min_version_path, 'w+') as min_version_file:
dump(delta_protocol, min_version_file)
min_version_file.write("\n")

num_versions_with_action = len(version_to_actions)
for version in range(min_version, max_version + 1):
log_file_name = str(version).zfill(20) + ".json"
log_file_path = os.path.join(log_dir, log_file_name)
with open(log_file_path, 'a+') as log_file:
if version in version_to_metadata:
dump(version_to_metadata[version], log_file)
log_file.write("\n")
for action in version_to_actions[version]:
dump(action, log_file)
log_file.write("\n")
# Ensure log file modification time matches the version timestamp
# _commit_timestamp of an action is populated by log file modification time
if version in version_to_timestamp:
# os.utime accepts seconds while delta log timestamp is in ms
os.utime(log_file_path, times=(0, version_to_timestamp[version] // 1000))

if min_version > 0 and num_versions_with_action > 0:
# Fake checkpoint so kernel reads logs from the start version
checkpoint_version = min_version - 1
checkpoint_file_name = str(checkpoint_version).zfill(20) + ".checkpoint.parquet"
with open(os.path.join(log_dir, checkpoint_file_name), 'w+b') as checkpoint_file:
checkpoint_file.write(get_fake_checkpoint_byte_array())
checkpoint_file.close()

# Ensure _last_checkpoint points to the fake checkpoint
last_checkpoint_content = \
f'{{"version":{min_version - 1},"size":{len(get_fake_checkpoint_byte_array())}}}'
last_checkpoint_path = os.path.join(log_dir, '_last_checkpoint')
with open(last_checkpoint_path, 'w+') as last_checkpoint_file:
last_checkpoint_file.write(last_checkpoint_content)
last_checkpoint_file.close()

def __table_changes_to_pandas_kernel(self, cdfOptions: CdfOptions) -> pd.DataFrame:
self._rest_client.set_delta_format_header()
response = self._rest_client.list_table_changes(self._table, cdfOptions)
lines = response.lines

# first line is protocol
protocol_json = loads(lines.pop(0))
delta_protocol = {"protocol": protocol_json["protocol"]["deltaProtocol"]}
start_version = cdfOptions.starting_version

min_version = start_version if start_version is not None else (10**20 - 1)
max_version = 0
version_to_actions = defaultdict(list)
version_to_metadata = {}
version_to_timestamp = {}

# Construct map from version to actions that took place in that version
line_count = 1
for line in lines:
line_count += 1
line_json = loads(line)
if "file" in line_json:
file = line_json["file"]
action = file["deltaSingleAction"]
version = file["version"]
min_version = min(min_version, version)
max_version = max(max_version, version)
version_to_timestamp[version] = file["timestamp"]
version_to_actions[version].append(action)
elif "metaData" in line_json:
metadata = line_json["metaData"]
delta_metadata = {"metaData": metadata["deltaMetadata"]}
version = metadata["version"]
min_version = min(min_version, version)
max_version = max(max_version, version)
version_to_metadata[version] = delta_metadata
else:
raise Exception(
f"Invalid JSON object:\n{line}\nIs neither metadata nor file."
)

num_versions_with_action = len(version_to_actions)
print(
f"table_changes stats: min_version={min_version}, "
f"max_version={max_version}, "
f"num_versions_with_action={num_versions_with_action}, "
f"num_versions_with_metadata={len(version_to_metadata)}, "
f"lines_in_response={line_count}, "
)
# Create a temporary directory using the tempfile module
temp_dir = tempfile.TemporaryDirectory()
try:
delta_log_dir_name = temp_dir.name
table_path = "file:///" + delta_log_dir_name

# Create a new directory named '_delta_log' within the temporary directory
log_dir = os.path.join(delta_log_dir_name, '_delta_log')
os.makedirs(log_dir)
self.__write_temp_delta_log_cdf(
log_dir,
delta_protocol,
min_version,
max_version,
version_to_metadata,
version_to_actions,
version_to_timestamp,
)

# Invoke delta-kernel-rust to return the pandas dataframe
interface = delta_kernel_rust_sharing_wrapper.PythonInterface(table_path)
table = delta_kernel_rust_sharing_wrapper.Table(table_path)
scan = delta_kernel_rust_sharing_wrapper.TableChangesScanBuilder(
table, interface, min_version, max_version
).build()

if num_versions_with_action == 0:
schema = scan.execute(interface).schema
result = pd.DataFrame(columns=schema.names)
else:
table = pa.Table.from_batches(scan.execute(interface))
result = table.to_pandas()
finally:
# Delete the temp folder explicitly and remove the delta format from header
temp_dir.cleanup()
self._rest_client.remove_delta_format_header()

return result

def table_changes_to_pandas(self, cdfOptions: CdfOptions) -> pd.DataFrame:
# Only use delta format if explicitly specified
if self._use_delta_format:
return self.__table_changes_to_pandas_kernel(cdfOptions)

response = self._rest_client.list_table_changes(self._table, cdfOptions)

schema_json = loads(response.metadata.schema_string)
Expand Down
38 changes: 27 additions & 11 deletions python/delta_sharing/rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class ListTableChangesResponse:
protocol: Protocol
metadata: Metadata
actions: Sequence[FileAction]
lines: Sequence[str]


def retry_with_exponential_backoff(func):
Expand Down Expand Up @@ -413,20 +414,35 @@ def list_table_changes(self, table: Table, cdfOptions: CdfOptions) -> ListTableC
params.append(f"endingVersion={cdfOptions.ending_version}")
if cdfOptions.ending_timestamp is not None:
params.append(f"endingTimestamp={quote(cdfOptions.ending_timestamp)}")
if cdfOptions.include_historical_metadata is not None:
params.append(f"includeHistoricalMetadata={cdfOptions.include_historical_metadata}")
query_str += "&".join(params)

with self._get_internal(query_str) as lines:
protocol_json = json.loads(next(lines))
metadata_json = json.loads(next(lines))
actions: List[FileAction] = []
for line in lines:
actions.append(FileAction.from_json(json.loads(line)))
with self._get_internal(query_str, return_headers=True) as (headers, lines):
if DataSharingRestClient.DELTA_TABLE_VERSION_HEADER not in headers:
raise LookupError("Missing delta-table-version header")

return ListTableChangesResponse(
protocol=Protocol.from_json(protocol_json["protocol"]),
metadata=Metadata.from_json(metadata_json["metaData"]),
actions=actions,
)
if (DataSharingRestClient.CAPABILITIES_HEADER in headers and
"responseformat=delta" in headers[DataSharingRestClient.CAPABILITIES_HEADER]):
return ListTableChangesResponse(
protocol=None,
metadata=None,
actions=None,
lines=list(lines),
)
else:
protocol_json = json.loads(next(lines))
metadata_json = json.loads(next(lines))
actions: List[FileAction] = []
for line in lines:
actions.append(FileAction.from_json(json.loads(line)))

return ListTableChangesResponse(
protocol=Protocol.from_json(protocol_json["protocol"]),
metadata=Metadata.from_json(metadata_json["metaData"]),
actions=actions,
lines=None
)

def close(self):
self._session.close()
Expand Down
Loading

0 comments on commit 22b77cc

Please sign in to comment.