Skip to content

Commit

Permalink
feat: fsspec source non-blocking chunks (#979)
Browse files Browse the repository at this point in the history
* attempt at non-blocking

* call Future constructor correctly

* working future

* remove comment (fsspec/filesystem_spec#1388)

* add test for chunks

* add missing import

* now chunks are retrieved correctly but is blocking

* now it's non-blocking

* remove print from test

* remove timings from test

* add executor to class (no cleanup)

* cleanup executor

* use concurrent thread pool

* Update src/uproot/source/fsspec.py

Co-authored-by: Jim Pivarski <jpivarski@users.noreply.github.com>

* made sources not pickleable

* Revert "made sources not pickleable"

This reverts commit fc2759f.

* new callback

* remove comment

* add return type hint to `closed` and `num_bytes`

* more return type hints

* move "Chunk" class position in file to allow type hinting (no code changes)

* add `add_done_callback` to TrivialFuture

* remove tmp_path

* call callback immediately in trivial future

* Revert "move "Chunk" class position in file to allow type hinting (no code changes)"

This reverts commit d122e20.

* import annotations from __future__, type hints

* chunks return type hint

* import skip for aiohttp

* add annotations

* add license string

* remove check for http

* test with "use_threads" enabled and disabled

---------

Co-authored-by: Jim Pivarski <jpivarski@users.noreply.github.com>
  • Loading branch information
lobis and jpivarski authored Oct 12, 2023
1 parent 80718ab commit 9ea709b
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 48 deletions.
22 changes: 12 additions & 10 deletions src/uproot/source/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
:doc:`uproot.source.chunk.Source`, the primary types of the "physical layer."
"""

from __future__ import annotations

import numbers
import queue

Expand Down Expand Up @@ -47,7 +49,7 @@ class Source:
the file.
"""

def chunk(self, start, stop):
def chunk(self, start, stop) -> Chunk:
"""
Args:
start (int): Seek position of the first byte to include.
Expand All @@ -58,7 +60,7 @@ def chunk(self, start, stop):
:doc:`uproot.source.chunk.Chunk`.
"""

def chunks(self, ranges, notifications: queue.Queue):
def chunks(self, ranges, notifications: queue.Queue) -> list[Chunk]:
"""
Args:
ranges (list of (int, int) 2-tuples): Intervals to fetch
Expand Down Expand Up @@ -95,29 +97,29 @@ def file_path(self):
return self._file_path

@property
def num_bytes(self):
def num_bytes(self) -> int:
"""
The number of bytes in the file.
"""
return self._num_bytes

@property
def num_requests(self):
def num_requests(self) -> int:
"""
The number of requests that have been made (performance counter).
"""
return self._num_requests

@property
def num_requested_chunks(self):
def num_requested_chunks(self) -> int:
"""
The number of :doc:`uproot.source.chunk.Chunk` objects that have been
requested (performance counter).
"""
return self._num_requested_chunks

@property
def num_requested_bytes(self):
def num_requested_bytes(self) -> int:
"""
The number of bytes that have been requested (performance counter).
"""
Expand All @@ -130,7 +132,7 @@ def close(self):
self.__exit__(None, None, None)

@property
def closed(self):
def closed(self) -> bool:
"""
True if the associated file/connection/thread pool is closed; False
otherwise.
Expand All @@ -152,7 +154,7 @@ def __repr__(self):
type(self).__name__, path, self.num_workers, id(self)
)

def chunk(self, start, stop):
def chunk(self, start, stop) -> Chunk:
self._num_requests += 1
self._num_requested_chunks += 1
self._num_requested_bytes += stop - start
Expand All @@ -162,7 +164,7 @@ def chunk(self, start, stop):
self._executor.submit(future)
return chunk

def chunks(self, ranges, notifications: queue.Queue):
def chunks(self, ranges, notifications: queue.Queue) -> list[Chunk]:
self._num_requests += 1
self._num_requested_chunks += len(ranges)
self._num_requested_bytes += sum(stop - start for start, stop in ranges)
Expand Down Expand Up @@ -192,7 +194,7 @@ def num_workers(self):
return self._executor.num_workers

@property
def closed(self):
def closed(self) -> bool:
"""
True if the :doc:`uproot.source.futures.ResourceThreadPoolExecutor` has
been shut down and the file handles have been closed.
Expand Down
14 changes: 9 additions & 5 deletions src/uproot/source/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
:doc:`uproot.source.file.MultithreadedFileSource` is an automatic fallback.
"""

from __future__ import annotations

import os.path
import queue

import numpy

Expand Down Expand Up @@ -45,7 +47,7 @@ def file(self):
return self._file

@property
def closed(self):
def closed(self) -> bool:
return self._file.closed

def __enter__(self):
Expand Down Expand Up @@ -137,7 +139,7 @@ def __repr__(self):
fallback = " with fallback"
return f"<{type(self).__name__} {path}{fallback} at 0x{id(self):012x}>"

def chunk(self, start, stop):
def chunk(self, start, stop) -> uproot.source.chunk.Chunk:
if self._fallback is None:
if self.closed:
raise OSError(f"memmap is closed for file {self._file_path}")
Expand All @@ -153,7 +155,9 @@ def chunk(self, start, stop):
else:
return self._fallback.chunk(start, stop)

def chunks(self, ranges, notifications):
def chunks(
self, ranges, notifications: queue.Queue
) -> list[uproot.source.chunk.Chunk]:
if self._fallback is None:
if self.closed:
raise OSError(f"memmap is closed for file {self._file_path}")
Expand Down Expand Up @@ -195,7 +199,7 @@ def fallback(self):
return self._fallback

@property
def closed(self):
def closed(self) -> bool:
if self._fallback is None:
return self._file._mmap.closed
else:
Expand All @@ -219,7 +223,7 @@ def __exit__(self, exception_type, exception_value, traceback):
self._fallback.__exit__(exception_type, exception_value, traceback)

@property
def num_bytes(self):
def num_bytes(self) -> int:
if self._fallback is None:
return self._file._mmap.size()
else:
Expand Down
61 changes: 46 additions & 15 deletions src/uproot/source/fsspec.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
# BSD 3-Clause License; see https://github.com/scikit-hep/uproot5/blob/main/LICENSE

from __future__ import annotations

import concurrent.futures
import queue

import uproot
import uproot.source.chunk
import uproot.source.futures


class FSSpecSource(uproot.source.chunk.Source):
Expand All @@ -13,16 +22,28 @@ class FSSpecSource(uproot.source.chunk.Source):
to get many chunks in one request.
"""

def __init__(self, file_path, **kwargs):
def __init__(self, file_path, **options):
import fsspec.core

default_options = uproot.reading.open.defaults
self._use_threads = options.get("use_threads", default_options["use_threads"])
self._num_workers = options.get("num_workers", default_options["num_workers"])

# TODO: is timeout always valid?

# Remove uproot-specific options (should be done earlier)
exclude_keys = set(uproot.reading.open.defaults.keys())
opts = {k: v for k, v in kwargs.items() if k not in exclude_keys}
exclude_keys = set(default_options.keys())
opts = {k: v for k, v in options.items() if k not in exclude_keys}

self._fs, self._file_path = fsspec.core.url_to_fs(file_path, **opts)

if self._use_threads:
self._executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self._num_workers
)
else:
self._executor = uproot.source.futures.TrivialExecutor()

# TODO: set mode to "read-only" in a way that works for all filesystems
self._file = self._fs.open(self._file_path)
self._fh = None
Expand All @@ -37,15 +58,25 @@ def __repr__(self):
path = repr("..." + self._file_path[-10:])
return f"<{type(self).__name__} {path} at 0x{id(self):012x}>"

def __getstate__(self):
state = dict(self.__dict__)
state.pop("_executor")
return state

def __setstate__(self, state):
self.__dict__ = state
self._open()

def __enter__(self):
self._fh = self._file.__enter__()
return self

def __exit__(self, exception_type, exception_value, traceback):
self._fh = None
self._executor.shutdown()
self._file.__exit__(exception_type, exception_value, traceback)

def chunk(self, start, stop):
def chunk(self, start, stop) -> uproot.source.chunk.Chunk:
"""
Args:
start (int): Seek position of the first byte to include.
Expand All @@ -66,7 +97,9 @@ def chunk(self, start, stop):
future = uproot.source.futures.TrivialFuture(data)
return uproot.source.chunk.Chunk(self, start, stop, future)

def chunks(self, ranges, notifications):
def chunks(
self, ranges, notifications: queue.Queue
) -> list[uproot.source.chunk.Chunk]:
"""
Args:
ranges (list of (int, int) 2-tuples): Intervals to fetch
Expand Down Expand Up @@ -97,28 +130,26 @@ def chunks(self, ranges, notifications):
self._num_requests += 1
self._num_requested_chunks += len(ranges)
self._num_requested_bytes += sum(stop - start for start, stop in ranges)
data = self._fs.cat_ranges(
[self._file_path] * len(ranges),
[start for start, _ in ranges],
[stop for _, stop in ranges],
)

chunks = []
for item, (start, stop) in zip(data, ranges):
future = uproot.source.futures.TrivialFuture(item)
for start, stop in ranges:
future = self._executor.submit(
self._fs.cat_file, self._file_path, start, stop
)
chunk = uproot.source.chunk.Chunk(self, start, stop, future)
uproot.source.chunk.notifier(chunk, notifications)()
future.add_done_callback(uproot.source.chunk.notifier(chunk, notifications))
chunks.append(chunk)
return chunks

@property
def num_bytes(self):
def num_bytes(self) -> int:
"""
The number of bytes in the file.
"""
return self._fs.size(self._file_path)

@property
def closed(self):
def closed(self) -> bool:
"""
True if the associated file/connection/thread pool is closed; False
otherwise.
Expand Down
10 changes: 8 additions & 2 deletions src/uproot/source/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ class TrivialFuture:
def __init__(self, result):
self._result = result

def add_done_callback(self, callback, *, context=None):
"""
The callback is called immediately.
"""
callback(self)

def result(self, timeout=None):
"""
The result of this (Trivial)Future.
Expand Down Expand Up @@ -389,7 +395,7 @@ def close(self):
self.__exit__(None, None, None)

@property
def closed(self):
def closed(self) -> bool:
"""
True if the :doc:`uproot.source.futures.ResourceWorker` threads have
been stopped and their
Expand Down Expand Up @@ -449,7 +455,7 @@ def close(self):
self.__exit__(None, None, None)

@property
def closed(self):
def closed(self) -> bool:
"""
True if the :doc:`uproot.source.futures.ResourceTrivialExecutor` has
been stopped.
Expand Down
13 changes: 8 additions & 5 deletions src/uproot/source/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Despite the name, both sources support secure HTTPS (selected by URL scheme).
"""

from __future__ import annotations

import base64
import queue
Expand Down Expand Up @@ -593,7 +594,7 @@ def __repr__(self):
fallback = " with fallback"
return f"<{type(self).__name__} {path}{fallback} at 0x{id(self):012x}>"

def chunk(self, start, stop):
def chunk(self, start, stop) -> uproot.source.chunk.Chunk:
self._num_requests += 1
self._num_requested_chunks += 1
self._num_requested_bytes += stop - start
Expand All @@ -603,7 +604,9 @@ def chunk(self, start, stop):
self._executor.submit(future)
return chunk

def chunks(self, ranges, notifications):
def chunks(
self, ranges, notifications: queue.Queue
) -> list[uproot.source.chunk.Chunk]:
if self._fallback is None:
self._num_requests += 1
self._num_requested_chunks += len(ranges)
Expand Down Expand Up @@ -639,7 +642,7 @@ def executor(self):
return self._executor

@property
def closed(self):
def closed(self) -> bool:
return self._executor.closed

def __enter__(self):
Expand All @@ -658,7 +661,7 @@ def timeout(self):
return self._timeout

@property
def num_bytes(self):
def num_bytes(self) -> int:
if self._num_bytes is None:
self._num_bytes = get_num_bytes(
self._file_path, self.parsed_url, self._timeout
Expand Down Expand Up @@ -756,7 +759,7 @@ def timeout(self):
return self._timeout

@property
def num_bytes(self):
def num_bytes(self) -> int:
if self._num_bytes is None:
self._num_bytes = get_num_bytes(
self._file_path, self.parsed_url, self._timeout
Expand Down
2 changes: 1 addition & 1 deletion src/uproot/source/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def obj(self):
return self._obj

@property
def closed(self):
def closed(self) -> bool:
return getattr(self._obj, "closed", False)

def __enter__(self):
Expand Down
Loading

0 comments on commit 9ea709b

Please sign in to comment.