Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pyarrow filter not pushed to scan_ds if datatype is a string #6395

Closed
2 tasks done
dominikpeter opened this issue Jan 23, 2023 · 18 comments · Fixed by #6734
Closed
2 tasks done

Pyarrow filter not pushed to scan_ds if datatype is a string #6395

dominikpeter opened this issue Jan 23, 2023 · 18 comments · Fixed by #6734
Assignees
Labels
bug Something isn't working python Related to Python Polars

Comments

@dominikpeter
Copy link

dominikpeter commented Jan 23, 2023

Polars version checks

  • I have checked that this issue has not already been reported.

  • I have confirmed this bug exists on the latest version of Polars.

Issue description

I have quite some performance differences when filtering a pyarrow dataset with the scanner method compared to polars own filtering.
After some testing, I found a strange behavior that could explain the performance difference. Could it be that predicates will not be pushed down to the pyarrow dataset if the datatype is a string?

In the example below the predicate is None in the first example compared to the other examples that filter on an integer:

predicate = None
with_columns = ['animal']

predicate = (pa.dataset.field('n_legs') == 2)
with_columns = ['animal']

predicate = (pa.dataset.field('year') == 2020)
with_columns = ['animal']

Therefore, on a bigger dataset, ds.scanner(filter=ds.field("animal") == "Flamingo") is way faster.

Or do I miss something?

Reproducible example

import polars as pl
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds

from typing import cast
from functools import partial
import pickle

table = pa.table(
    {
        "year": [2020, 2022, 2021, 2022, 2019, 2021],
        "n_legs": [2, 2, 4, 4, 5, 100],
        "animal": ["Flamingo", "Parrot", "Dog", "Horse", "Brittle stars", "Centipede"],
    }
)


pq.write_table(table, "file.parquet")

dataset = ds.dataset("file.parquet", format="parquet")


def _scan_ds_impl(ds: pa.dataset.dataset, with_columns: list[str] | None, predicate: str | None) -> pl.DataFrame:

    print(f"predicate = {predicate}")
    print(f"with_columns = {with_columns}")

    _filter = None
    if predicate:
        _filter = eval(predicate)
    return cast(pl.DataFrame, pl.from_arrow(ds.to_table(columns=with_columns, filter=_filter)))


def _scan_ds(ds: pa.dataset.dataset, allow_pyarrow_filter: bool = True) -> pl.LazyFrame:

    func = partial(_scan_ds_impl, ds)
    func_serialized = pickle.dumps(func)
    return pl.LazyFrame._scan_python_function(ds.schema, func_serialized, allow_pyarrow_filter)


df = (
    _scan_ds(dataset, allow_pyarrow_filter=True)
    .select(pl.col("animal"))
    .filter(pl.col("animal") == "Flamingo")
    .collect()
)
print("----------------")
df = _scan_ds(dataset, allow_pyarrow_filter=True).select(pl.col("animal")).filter(pl.col("n_legs") == 2).collect()


print("----------------")
df = _scan_ds(dataset, allow_pyarrow_filter=True).select(pl.col("animal")).filter(pl.col("year") == 2020).collect()

Expected behavior

Also push down the string predicate:

predicate = (pa.dataset.field('animal') == 'Flamingo')
with_columns = ['animal']

predicate = (pa.dataset.field('n_legs') == 2)
with_columns = ['animal']

predicate = (pa.dataset.field('year') == 2020)
with_columns = ['animal']

Installed versions

---Version info---
Polars: 0.15.16
Index type: UInt32
Platform: macOS-13.1-arm64-arm-64bit
Python: 3.11.1 (v3.11.1:a7a450f84a, Dec 6 2022, 15:24:06) [Clang 13.0.0 (clang-1300.0.29.30)]
---Optional dependencies---
pyarrow: 10.0.1
pandas: 1.5.2
numpy: 1.24.1
fsspec:
connectorx:
xlsx2csv:
deltalake:
matplotlib:

@dominikpeter dominikpeter added bug Something isn't working python Related to Python Polars labels Jan 23, 2023
@ritchie46
Copy link
Member

@stinodego that would explain why partitioning must be set in scan_delta.

@dominikpeter
Copy link
Author

Could we help here somehow? I think the issue is not in Python, but in the Rust backend. But we don't really know where to start debugging.
We cannot not really use Delta Lake with Polars at the moment because we have to put way too much data into memory with our data. We can filter the data first and then use from_arrow to load the data. With a z-ordered table, it works well even on large datasets.
So, we can work around it. However, we would like to use Polars native filter method -because we love Polars :-).

@stinodego
Copy link
Member

I have to look at this more closely - I admit it fell off my radar. Will come back to this later this weekend.

@stinodego stinodego self-assigned this Feb 3, 2023
@stinodego
Copy link
Member

stinodego commented Feb 5, 2023

@stinodego that would explain why partitioning must be set in scan_delta.

That is a different issue. scan_delta does the following:

  1. Map the file to a DeltaTable object (deltalake dependency takes care of this)
  2. Convert to a pyarrow dataset (deltalake dependency takes care of this)
  3. Call pl.scan_ds

Any subsequent filters are pushed to scan_ds just fine. The problem is that these filters are not pushed further to step 2, where the conversion happens from Delta to pyarrow. This is required to take advantage of on-disk partitioning that Delta uses (if you're unfamiliar with Delta, it's just Parquet files with a layer on top).

The way to handle this right now is to specify filters manually in step 2. I wouldn't know a good way to automate this.

On topic

About the original issue from this post: I can confirm that string predicates are not pushed up correctly. Thanks for the clear example. I'm really not very familiar with this scanning code - @ritchie46 could you take a closer look?

@dominikpeter
Copy link
Author

dominikpeter commented Feb 5, 2023

@stinodego that would explain why partitioning must be set in scan_delta.

That is a different issue. scan_delta does the following:

  1. Map the file to a DeltaTable object (deltalake dependency takes care of this)
  2. Convert to a pyarrow dataset (deltalake dependency takes care of this)
  3. Call pl.scan_ds

Any subsequent filters are pushed to scan_ds just fine. The problem is that these filters are not pushed further to step 2, where the conversion happens from Delta to pyarrow. This is required to take advantage of on-disk partitioning that Delta uses (if you're unfamiliar with Delta, it's just Parquet files with a layer on top).

The way to handle this right now is to specify filters manually in step 2. I wouldn't know a good way to automate this.

On topic

About the original issue from this post: I can confirm that string predicates are not pushed up correctly. Thanks for the clear example. I'm really not very familiar with this scanning code - @ritchie46 could you take a closer look?

I need to look at this more closely, but I think if the filters are properly pushed into the scan_ds. The partition is also filtered and file skipping is activated. At least this is described here: reading partitioned data

Regarding the scan_ds issue, I can also confirm that none of the predicated are pushed through when a string is used. In the example both filters are not included:
df = ( _scan_ds(dataset, allow_pyarrow_filter=True) .select([pl.col("animal"), pl.col("year")]) .filter((pl.col("year") == 2020) & (pl.col("animal") == "Flamingo")) .collect() )

predicate = None
with_columns = ['animal', 'year']

@aersam
Copy link

aersam commented Feb 6, 2023

Something to consider: delta-io/delta-rs#1128
ADBC Support would most likely supersed current implementation?

@chitralverma
Copy link
Contributor

chitralverma commented Feb 7, 2023

Any subsequent filters are pushed to scan_ds just fine. The problem is that these filters are not pushed further to step 2, where the conversion happens from Delta to pyarrow. This is required to take advantage of on-disk partitioning that Delta uses (if you're unfamiliar with Delta, it's just Parquet files with a layer on top).

@stinodego your above explanation is correct, but wont this affect any thing that relies on scan_ds and not just scan_delta?

Let me know if you need some help regarding this, because I saw this as a potential bottleneck while adding the functionality. I considered using the deltalake lib only to get the paths of the parquet files based on table version and then call scan_parquet internally in scan_delta but that did not seem like the right way to go either.

@dominikpeter
Copy link
Author

dominikpeter commented Feb 7, 2023

Any subsequent filters are pushed to scan_ds just fine. The problem is that these filters are not pushed further to step 2, where the conversion happens from Delta to pyarrow. This is required to take advantage of on-disk partitioning that Delta uses (if you're unfamiliar with Delta, it's just Parquet files with a layer on top).

@stinodego your above explanation is correct, but wont this affect any thing that relies on scan_ds and not just scan_delta?

Let me know if you need some help regarding this, because I saw this as a potential bottleneck while adding the functionality. I considered using the deltalake lib only to get the paths of the parquet files based on table version and then call scan_parquet internally in scan_delta but that did not seem like the right way to go either.

I think there are two problems with the scan_parquet approach. First, scan_parquet only allows glob pattern, so you would need to merge them somehow. Second, you would lose the benefits of the statistics. They are stored in the pyarrow dataset fragments. Therefore, file skipping would not be possible.

I believe if scan_dspasses all predicates to the underlying pyarrow datasets, the problem with the partitions are solved too.

@chitralverma by the way, I saw that delta-rs 0.7.0 was released, that would allow the DeltaStorageHandler to be pickable delta-rs issue 1016 -wouldn't solve this issue thought.

@chitralverma
Copy link
Contributor

First, scan_parquet only allows glob pattern, so you would need to merge them somehow.

for lazy dfs, concat is a zero copy op, so that is easy.

Second, you would lose the benefits of the statistics

actually i confirmed it with some of the delta guys for another usecase on how this works, If i use the deltalake lib to just list the parquet files for a specified version, the delta log is looked up and the file skipping actually happens. the main question is this - does the polars parquet read use parquet statistics for optimizing IO ? @ritchie46 maybe you can clarify this one.

@dominikpeter
Copy link
Author

dominikpeter commented Feb 7, 2023

I did some tests on my side on the dataset we are struggling with:

file_url = "data/delta/item"

@memory_usage
def scan():
    df = pl.scan_delta(file_url)
    print(df.filter(pl.col("item") == "00009501").fetch(5))

@memory_usage
def concat():
    dt = DeltaTable(file_url)
    dfs = pl.concat([pl.scan_parquet(i) for i in dt.file_uris()])
    print(dfs.filter(pl.col("item") == "00009501").fetch(5))

Function Name : scan
Current memory usage: 0.178867MB
Peak : 3.096506MB
Timing Took : 11.0809 seconds

Function Name : concat
Current memory usage: 0.001219MB
Peak : 0.021559MB
Timing Took : 0.0959 seconds

The parquet concat method is significantly faster in my example.

@chitralverma thanks a lot! We can already use this internally for our use case.

@chitralverma
Copy link
Contributor

@dominikpeter try setting rechunk to False in pl.concat and see if it gets better

@dominikpeter
Copy link
Author

times = []

for i in range(10):
    start = datetime.now()
    dt = DeltaTable(file_url)
    dfs = pl.concat([pl.scan_parquet(i) for i in dt.file_uris()], rechunk=False)
    dfs.filter((pl.col("item") == "01523048")).collect(streaming=True)
    end = datetime.now()
    delta = (end - start).total_seconds()
    times.append(delta)

print("rechunk=False")
print(np.array(times).mean())
print(np.array(times).std())

times = []

for i in range(10):
    start = datetime.now()
    dt = DeltaTable(file_url)
    dfs = pl.concat([pl.scan_parquet(i) for i in dt.file_uris()], rechunk=True)
    dfs.filter((pl.col("item") == "01523048")).collect(streaming=True)
    end = datetime.now()
    delta = (end - start).total_seconds()
    times.append(delta)

print("rechunk=True")
print(np.array(times).mean())
print(np.array(times).std())

rechunk=False
2.7260541
0.08526206672776585

rechunk=True
2.8425098
0.21936449264126584

@chitralverma it slightly improved the performance in my example

@chitralverma
Copy link
Contributor

ok, i expected that.
so this can technically work, here are my suggestions for further development of this connector in Polars.

  • We change change the implementation internally to not rely on pyarrow and scan_ds and instead rely on scan_parquet like you just tried, OR,
  • We move all this to rust side and then do the same. - Gain of this is that the for loop you just wrote is sequential and can be sped up if run in parallel with async or something.

What I am looking for is an argument against this approach. I don't think we are missing out on any capabilities but I am also not 100% sure.

BTW sorry for using your time to test these things but if you have a delta table in an object storage (any one) can you try the scan_delta vs scan_parquet test on that. my guess is that in that case the results are not going to be very different.

@dominikpeter
Copy link
Author

dominikpeter commented Feb 7, 2023

I don't think there is a real downside. This approach is also described here: https://delta-io.github.io/delta-rs/python/usage.html for dask dataframes.

I think this could live inside rust and benefit from the things you mentioned. I would prefer this approach.

No problem. I have to thank here :-). I will test it tomorrow and give feedback.

@dominikpeter
Copy link
Author

I was not able to make it work with an object store (azure storage account).

file_url = f"abfss://mycontainer@mystorage.dfs.core.windows.net/delta/item"
file_url = f"abfss://mycontainer/delta/item"

# tried both

storage_options = {
    "account_name": "mystorage",
    "account_key": "mykey",
}

@memory_usage
def concat():
    dt = DeltaTable(file_url, storage_options=storage_options)
    print(dt.file_uris())
    dfs = pl.concat([pl.scan_parquet(i, storage_options=storage_options) for i in dt.file_uris()])
    print(dfs.filter(pl.col("item") == "8043152").collect(streaming=True))

delta-rs gives me the correct parquet files with the correct uris back.

But polars results in an error:
FileNotFoundError: No such file or directory: abfss://xxxxxxx

We discussed it a little bit internally. For the moment, we can push down the filters to the pyarrow dataset or use the concat approach. This will work for our use case.

I guess, it makes sense to wait for ADBC before putting too much effort in this. One downside of the concat parquet approach will be for sure for future releases of delta-rs, when they want to support column mapping, computed columns and so on.

Still believe it would make sense to fix the scan_ds filter pushdown bug, thought :-).

@chitralverma
Copy link
Contributor

the file not found issue is because abfss scheme is not supported by fsspec

see supported schemes in the azure example here

https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.scan_delta.html#polars.scan_delta

but this can be changed now that the pickling issue is fixed on their side, however then in polars we will have to pin deltalake dependency to 0.7+

ADBC will require a conversion of polars plan to SQL for pushdown which does not exist at the moment in polars.

@dominikpeter
Copy link
Author

I did some extensive test with the application we are running. I compared the scan_delta vs concat_parquet approach.

We are using for example offset this is way slower with the concat_parquet approach. However, filtering on string column of course is faster. But still believe there it could be faster when skipping parquet files that don't include record I am filtering for.

It was a really mixed result. I think for the moment I would keep it as it is. Maybe bump to 0.7 to take advantage of the new pickling.

What we end up doing probably is something like this:

    if filter:
        dt = DeltaTable(file_url)
        dataset = dt.to_pyarrow_dataset().scanner(filter=ds.field("item") == "8043152").to_table()
        df = pl.from_arrow(dataset)
    else:
        df = pl.scan_delta(file_url)

@ritchie46
Copy link
Member

The problem seems to be that we don't push the predicate down to _scan_ds. I will take a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working python Related to Python Polars
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants