Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup_metadata not respecting custom logRetentionDuration #2180

Closed
liamphmurphy opened this issue Feb 11, 2024 · 8 comments · Fixed by #2250
Closed

cleanup_metadata not respecting custom logRetentionDuration #2180

liamphmurphy opened this issue Feb 11, 2024 · 8 comments · Fixed by #2250
Labels
binding/python Issues for the Python package bug Something isn't working

Comments

@liamphmurphy
Copy link
Contributor

Environment

Delta-rs version: python-v0.15.3

Binding: Python

Environment:

  • Other: Local File System (and S3)

Bug

What happened:
When either creating a table (using write_deltalake) or altering an already made delta table using Spark, subsequent cleanup_metadata() calls are not respecting a custom log retention duration. In fact, if one is provided, it doesn't seem to delete ones past 30 days either (the default).

What you expected to happen:
cleanup_metadata would succeed on files older than the custom log retention value, see blow.

How to reproduce it:

First, I'm writing a test Delta Table like so:

import pandas as pd
import numpy as np
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
import pyarrow.dataset as ds
num_table_writes = 500

data = {'id': range(0, num_table_writes), 'value': np.random.rand(num_table_writes), 'random_string': [str(np.random.rand()) for _ in range(num_table_writes)]}
df = pd.DataFrame(data)

path_table1 = "./tables/delta_table_merge"

pa_table = pa.Table.from_pandas(df)


def chunk_table(table, chunk_size):
    for i in range(0, len(table), chunk_size):
        yield table.slice(i, chunk_size)

pa_table = pa.Table.from_pandas(df)

chunk_size = 100 

print("writing data to Delta tables")
for chunk in chunk_table(pa_table, chunk_size):
    # Setting both log retention values, as I'm not sure which one delta-rs respects?
    write_deltalake(path_table1, chunk, configuration={"delta.logRetentionDuration": "2 days", "logRetentionDuration": "2 days"}, mode="append", partition_by=["random_string"])

table = DeltaTable(path_table1)
print(table.metadata())

Then, to simulate a log file getting old, I modify the first log file like so (making the last modified time for the file roughly a year old):

touch -t 202301150101 tables/delta_table_merge/_delta_log/00000000000000000000.json

Then:

dt = DeltaTable("./tables/delta_table_merge/")
dt.cleanup_metadata()

And nothing happens, the first log file is not deleted.

More details:

As an aside, if I alter the table using Spark (and do this 10 times, causing 10 transactions logs and presumably hitting the default cleanup metadata on a checkpoint interval logic) using the below, it does correctly delete log files older than 2 days:

from pyspark.sql import SparkSession
from delta import *

localDeltaTablePath = "file:////Users/user/scratch/testing-delta/tables/delta_table_merge"

spark = SparkSession.builder \
    .appName("DeltaLogRetentionChange") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.1.0,org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .enableHiveSupport() \
    .getOrCreate()

df = spark.read.format("delta").load(localDeltaTablePath)

df.show()
sqlCommand = f"ALTER TABLE delta.`{localDeltaTablePath}` SET TBLPROPERTIES ('logRetentionDuration'='5 days')"
#spark.sql(sqlCommand)

# After the 10th operation, the old log gets deleted (after a manual touch -t command)
for i in range (0, 15):
    spark.sql(sqlCommand)
@liamphmurphy liamphmurphy added the bug Something isn't working label Feb 11, 2024
@ion-elgreco
Copy link
Collaborator

@liamphmurphy spark creates checkpoints by default at 10 commits, we don't do that yet

Can you create a checkpoint first before executing clean up metadata?

@liamphmurphy
Copy link
Contributor Author

@liamphmurphy spark creates checkpoints by default at 10 commits, we don't do that yet

Can you create a checkpoint first before executing clean up metadata?

@ion-elgreco Hi good call out, here's what I discovered after adding a create_checkpoint call before cleanup_metadata.

Using the same 2 days value above, if I set the last modified date of the file to Feb 1st 2024 (11 days ago as of time of writing), it does not delete the log file. However, if I set it to Jan 10th using this command touch -t 202401100101 tables/delta_table_merge/_delta_log/00000000000000000000.json it does delete. It seems like it's still using either the default 30 days, or I'm setting the value incorrectly?

@ion-elgreco
Copy link
Collaborator

@liamphmurphy we for some reason use the singular format of days->day.

If you want you can open a PR to also parse the plural format:) it's a very trivial change, but haven't found the time yet

@liamphmurphy
Copy link
Contributor Author

liamphmurphy commented Feb 11, 2024

@liamphmurphy we for some reason use the singular format of days->day.

If you want you can open a PR to also parse the plural format:) it's a very trivial change, but haven't found the time yet

Would you mind pointing to me where this logic is done? I'm trying 2 day, and still seeing the same behavior.

EDIT: Okay, I figured out that the exact format it's expecting is something like interval 2 day or interval 7 day.

@liamphmurphy
Copy link
Contributor Author

liamphmurphy commented Feb 11, 2024

^ And in case anyone comes across the same issue, I'm also seeing for right now you need to pass in delta.logRetentionDuration, just logRetentionDuration does not work. Based on what the docs say this could cause some confusion. @ion-elgreco I'll try to find some time over the next few days to make a PR and update the documentation.

As a general question @ion-elgreco ; I need to make this change in a production environment. Do you know if its OK to temporarily stop our writers (they're a Python lambda that runs write_deltalake), make the table properties change to set delta.logRetentionDuration using spark, create a checkpoint / cleanup_metadata, and then re-enable the writers with the new configuration mapping set in write_deltalake?

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Feb 11, 2024

@liamphmurphy here you need to change it: https://github.com/delta-io/delta-rs/blob/02fa4c29fe610842cc072efe1154e941dc3613d7/crates/core/src/table/config.rs#L511C4-L511C18, likely change everything to singular | plural => is enough, soday | days, but maybe better to switch to plural form

@liamphmurphy I think that's fine to do, you could likely also run alter set table_properties in Spark at same time

@liamphmurphy
Copy link
Contributor Author

@liamphmurphy here you need to change it: https://github.com/delta-io/delta-rs/blob/02fa4c29fe610842cc072efe1154e941dc3613d7/crates/core/src/table/config.rs#L511C4-L511C18, likely change everything to singular | plural => is enough, soday | days, but maybe better to switch to plural form

@liamphmurphy I think that's fine to do, you could likely also run alter set table_properties in Spark at same time

Thanks 👍 I also wonder if errors are not being raised on the Python side? Since my string didn't start with interval, I would have expected that to error. I'll try to look at that as well over the next few days.

@ion-elgreco
Copy link
Collaborator

@liamphmurphy here you need to change it: https://github.com/delta-io/delta-rs/blob/02fa4c29fe610842cc072efe1154e941dc3613d7/crates/core/src/table/config.rs#L511C4-L511C18, likely change everything to singular | plural => is enough, soday | days, but maybe better to switch to plural form
@liamphmurphy I think that's fine to do, you could likely also run alter set table_properties in Spark at same time

Thanks 👍 I also wonder if errors are not being raised on the Python side? Since my string didn't start with interval, I would have expected that to error. I'll try to look at that as well over the next few days.

We don't do any checks on the syntax of the interval but you could do this if you want! :)

@rtyler rtyler added the binding/python Issues for the Python package label Feb 16, 2024
ion-elgreco added a commit that referenced this issue Mar 6, 2024
# Description
Spark-scala uses the plural form to construct intervals, so "day**s**"
instead of "day". To keep backwards compatibility, I kept the singular
form as well.

- closes #2180
- closes: #2072
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/python Issues for the Python package bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants