Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python filesystems rewrite? #580

Closed
wants to merge 2 commits into from

Conversation

wjones127
Copy link
Collaborator

Description

⚠️ This is a rough draft for now, since I want to make sure this new approach makes sense.

This PR would deprecate the existing DeltaStorageHandler that subclasses pyarrow.FileSystem in favor of a DeltaFileSystem that has a to_pyarrow() method to get the equivalent PyArrow filesystem. The idea is that for reading and writing data files we should use the PyArrow ones, while the delta log is read by the delta-rs one.

The other approach I looked at initially is just filling in the methods

Some notes on each filesystem:

  • S3: PyArrow's S3 filesystem is more optimized than what we'd be able to write here and is already complete, so for S3 it's a clear win.
  • GCS: PyArrow doesn't have GCS bindings yet, but the C++ implementation is done and the Python bindings are in progress. So we should expect that soon 🤞.
  • ADLS: ADLS is held up in PyArrow because the C++ library uses C++ 14 while Arrow is strictly C++ 11, at least for another year or so 😞 . However, there is a community-supported fsspec implementation of ADLS, which could be combined with PyArrow's fsspec handler.

So if we went with this, we'd soon have 2 of 3 file systems already supported in the writer, and one that could be supported if another library is added as an optional dependency.

Related Issue(s)

For example:

Documentation

@wjones127
Copy link
Collaborator Author

Alternatively, maybe what we should do is create a FileSystem Rust object that can wrap any pyarrow.FileSystem object, and impl StorageBackend for that. You could even support any arbitrary fsspec filesystem through that too, and that's likely what Python users would want. Only hard thing there is making sure we don't get deadlocks with the GIL 😬

@wjones127
Copy link
Collaborator Author

wjones127 commented Mar 26, 2022

I think the interface I'd like to aim for is having filesystem parameters that can accept any pyarrow.FileSystem and fsspec filesystem. We don't want to use these as is, at least by default, since they don't support rename_obj_noreplace and that's required for concurrent writers. (Not all users may need this, but I think there's a strong case this should be supported by default.)

I think there's three different solutions we can implement:

  1. For the main pyarrow.FileSystem ones (S3 and soon GCS), we should automatically map them internally to their associated StorageBackend, and use that for delta log interaction. The other benefit of using these two separate filesystems is that they can each operate without the Python GIL and thus use multiple threads.
  2. For any other fsspec filesystem, we write a SafeDeltaFileSystem abstract mix-in that users could implement to add a rename_noreplace method and create a StorageBackend implementation that can wrap classes that use that mixin. This allows concurrent writers, but would be less performant than (1) since the GIL would preclude multi-threaded reads of the delta-log (though I honestly have no idea how noticeable that would be).
  3. For users that don't care about concurrent writers, we could also implement a wrapper around fsspec filesystems that just used it's move method for rename_noreplace. We would obviously want to document the caveat, but I imagine there are use cases where users would be fine with that.

@roeap
Copy link
Collaborator

roeap commented Mar 26, 2022

Personally I think your approach makes a lot of sense.

There may be however an opportunity to support files systems not supported by pyarrow out of the box without the need to have users provide a custom implementation. Once #573 lands we should be able to write RecordBatches to the delta table using the storage backends from this crate. Obviously this will initially only support simple updates, but there are some experiments about (#523) using the datafusion query engine that could be extended to also support spark-like delta SQL syntax for more complex updates.

Haven't really thought about it yet, if this could be fully transparent to the python users, but thanks to arrow moving record batches across language boundaries should be very cheap :).

@houqp
Copy link
Member

houqp commented Mar 28, 2022

Alternatively, maybe what we should do is create a FileSystem Rust object that can wrap any pyarrow.FileSystem object, and impl StorageBackend for that.

This would require ser/de between rust bytes and python bytes across language runtime on every write isn't it? The current storage backend abstraction in Rust core is not very well designed because it requires reading the whole object into memory before passing the data down to the caller. Ideally, the abstraction should support streaming read so we can parse json/parquet in a streaming fashion. So this is another thing to consider in the future if we want to go this route.

@wjones127
Copy link
Collaborator Author

Ideally, the abstraction should support streaming read so we can parse json/parquet in a streaming fashion.

@houqp I'm now thinking it might be better to simply work on improving the delta-rs filesystems and continue wrapping those in the Python module. For streaming reads and writes, is there a particular trait we should be implementing?

It's unclear to me what the parquet crate accepts. datafusion has an ObjectStore and ObjectReader traits, but that create is also an optional dependency.

@houqp
Copy link
Member

houqp commented Apr 7, 2022

For streaming reads and writes, is there a particular trait we should be implementing?

I think the ObjectStore trait in datafusion is a good one to adopt. I have been pushing datafusion to move the object store abstraction out of the core so we can reuse it in other projects like delta-rs ;)

async read/write has been added to the parquet2 create a long time ago. async read has been added to parquet-rs recently, see apache/arrow-rs#1154. I have been working on parquet2 migration on and off at #465. We likely will need to support both parquet crates with a feature flag for some time until we are confident that parquet2 can cover all our use-cases. It's supposed to be the fastest parquet implementation out there, so I am hoping that we will get a decent performance boost from the migration.

@wjones127 wjones127 closed this Apr 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Python: Finish filesystem bindings
3 participants