Skip to content

Commit

Permalink
fixed metadata handling and log file modification timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
PatrickJin-db committed Dec 10, 2024
1 parent c401e39 commit 5ab055f
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 25 deletions.
1 change: 1 addition & 0 deletions python/delta_sharing/delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ def load_table_changes_as_pandas(
ending_version=ending_version,
starting_timestamp=starting_timestamp,
ending_timestamp=ending_timestamp,
include_historical_metadata=use_delta_format
))


Expand Down
1 change: 1 addition & 0 deletions python/delta_sharing/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,4 @@ class CdfOptions:
ending_version: Optional[int] = None
starting_timestamp: Optional[str] = None
ending_timestamp: Optional[str] = None
include_historical_metadata: Optional[bool] = None
63 changes: 38 additions & 25 deletions python/delta_sharing/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def __write_temp_delta_log_snapshot(self, temp_dir: str, lines: Sequence[str]) -
json_file.close()
return table_path

def __write_temp_delta_log_cdf(self, temp_dir: str, lines: Sequence[str], start_version: Optional[int]) -> tuple[str, int, int]:
def __write_temp_delta_log_cdf(self, temp_dir: str, lines: Sequence[str], start_version: Optional[int]) -> tuple[str, int, int, int]:
# TODO: use with when opening files
delta_log_dir_name = temp_dir
table_path = "file:///" + delta_log_dir_name
Expand All @@ -247,27 +247,36 @@ def __write_temp_delta_log_cdf(self, temp_dir: str, lines: Sequence[str], start_
log_dir = os.path.join(delta_log_dir_name, '_delta_log')
os.makedirs(log_dir)

# first two lines are protocol and metadata, respectively
# first line is protocol
protocol_json = loads(lines.pop(0))
deltaProtocol = {"protocol": protocol_json["protocol"]["deltaProtocol"]}
metadata_json = loads(lines.pop(0))
deltaMetadata = {"metaData": metadata_json["metaData"]["deltaMetadata"]}

min_version = start_version if start_version is not None else (10**20 - 1)
max_version = 0
version_to_actions = defaultdict(list)
version_to_metadata = {}
version_to_timestamp = {}

# Construct map from version to actions that took place in that version
for line in lines:
line_json = loads(line)
file = line_json["file"]
action = file["deltaSingleAction"]
version = file["version"]
min_version = min(min_version, version)
max_version = max(max_version, version)
version_to_timestamp[version] = file["timestamp"]
version_to_actions[version].append(action)
if "file" in line_json:
file = line_json["file"]
action = file["deltaSingleAction"]
version = file["version"]
min_version = min(min_version, version)
max_version = max(max_version, version)
version_to_timestamp[version] = file["timestamp"]
version_to_actions[version].append(action)
elif "metaData" in line_json:
metadata = line_json["metaData"]
delta_metadata = {"metaData": metadata["deltaMetadata"]}
version = metadata["version"]
min_version = min(min_version, version)
max_version = max(max_version, version)
version_to_metadata[version] = delta_metadata
else:
raise Exception(f"Invalid JSON object:\n{line}\nIs neither metadata nor file.")

# starting version file needs to have metadata and protocol
min_version_file_name = str(min_version).zfill(20) + ".json"
Expand All @@ -276,24 +285,28 @@ def __write_temp_delta_log_cdf(self, temp_dir: str, lines: Sequence[str], start_

dump(deltaProtocol, min_version_file)
min_version_file.write("\n")
dump(deltaMetadata, min_version_file)
min_version_file.write("\n")
min_version_file.close()

num_versions_with_action = len(version_to_actions)
# Create log files
for version in range(min_version, max_version + 1):
log_file_name = str(version).zfill(20) + ".json"
log_file_path = os.path.join(log_dir, log_file_name)
log_file = open(log_file_path, 'a+')
if version in version_to_metadata:
dump(version_to_metadata[version], log_file)
log_file.write("\n")
for action in version_to_actions[version]:
# ensure deletionTimestamp is populated for remove
if "remove" in action:
action["deletionTimestamp"] = version_to_timestamp[version]
dump(action, log_file)
log_file.write("\n")
log_file.close()
# Ensure log file modification time matches the version timestamp
# _commit_timestamp of an action is populated by log file modification time
if version in version_to_timestamp:
# os.utime accepts seconds while delta log timestamp is in ms
os.utime(log_file_path, times=(0, version_to_timestamp[version] // 1000))

if min_version > 0:
if min_version > 0 and num_versions_with_action > 0:
# Fake checkpoint so kernel reads logs from the start version
checkpoint_version = min_version - 1
checkpoint_file_name = str(checkpoint_version).zfill(20) + ".checkpoint.parquet"
Expand All @@ -308,28 +321,28 @@ def __write_temp_delta_log_cdf(self, temp_dir: str, lines: Sequence[str], start_
last_checkpoint_file.write(last_checkpoint_content)
last_checkpoint_file.close()

return table_path, min_version, max_version
return table_path, min_version, max_version, num_versions_with_action

def __table_changes_to_pandas_kernel(self, cdfOptions: CdfOptions) -> pd.DataFrame:
self._rest_client.set_delta_format_header()
response = self._rest_client.list_table_changes(self._table, cdfOptions)

lines = response.lines
# Create a temporary directory using the tempfile module
temp_dir = tempfile.TemporaryDirectory()
table_path, min_version, max_version = self.__write_temp_delta_log_cdf(temp_dir.name, lines, cdfOptions.starting_version)
table_path, min_version, max_version, num_versions_with_action = \
self.__write_temp_delta_log_cdf(temp_dir.name, response.lines, cdfOptions.starting_version)

# Invoke delta-kernel-rust to return the pandas dataframe
interface = delta_kernel_rust_sharing_wrapper.PythonInterface(table_path)
table = delta_kernel_rust_sharing_wrapper.Table(table_path)
scan = delta_kernel_rust_sharing_wrapper.TableChangesScanBuilder(table, interface, min_version, max_version).build()

if (len(lines) == 0):
if (num_versions_with_action == 0):
schema = scan.execute(interface).schema
return pd.DataFrame(columns = schema.names)

table = pa.Table.from_batches(scan.execute(interface))
result = table.to_pandas()
result = pd.DataFrame(columns = schema.names)
else:
table = pa.Table.from_batches(scan.execute(interface))
result = table.to_pandas()

# Delete the temp folder explicitly and remove the delta format from header
temp_dir.cleanup()
Expand Down
2 changes: 2 additions & 0 deletions python/delta_sharing/rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,8 @@ def list_table_changes(self, table: Table, cdfOptions: CdfOptions) -> ListTableC
params.append(f"endingVersion={cdfOptions.ending_version}")
if cdfOptions.ending_timestamp is not None:
params.append(f"endingTimestamp={quote(cdfOptions.ending_timestamp)}")
if cdfOptions.include_historical_metadata is not None:
params.append(f"includeHistoricalMetadata={cdfOptions.include_historical_metadata}")
query_str += "&".join(params)

with self._get_internal(query_str, return_headers=True) as (headers, lines):
Expand Down

0 comments on commit 5ab055f

Please sign in to comment.