Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fsspec source non-blocking chunks #979

Merged
merged 36 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
20ffbfd
attempt at non-blocking
lobis Oct 6, 2023
eccd9a3
call Future constructor correctly
lobis Oct 6, 2023
7732ff5
working future
lobis Oct 6, 2023
20050ce
remove comment (https://github.com/fsspec/filesystem_spec/discussions…
lobis Oct 7, 2023
750f79e
add test for chunks
lobis Oct 7, 2023
00e25bc
add missing import
lobis Oct 10, 2023
c14cc3f
now chunks are retrieved correctly but is blocking
lobis Oct 10, 2023
55d10e8
now it's non-blocking
lobis Oct 10, 2023
b6bcee5
remove print from test
lobis Oct 10, 2023
af522fb
remove timings from test
lobis Oct 10, 2023
c86f154
add executor to class (no cleanup)
lobis Oct 10, 2023
aa52a82
cleanup executor
lobis Oct 10, 2023
3b7afa6
use concurrent thread pool
lobis Oct 10, 2023
935ff15
Update src/uproot/source/fsspec.py
lobis Oct 11, 2023
fc2759f
made sources not pickleable
lobis Oct 11, 2023
7b353d1
Revert "made sources not pickleable"
lobis Oct 11, 2023
d9bc9cf
Merge branch 'main-v510' into fsspec-chunks
lobis Oct 12, 2023
74a5461
Merge branch 'main-v510' into fsspec-chunks
lobis Oct 12, 2023
7d9a8bc
new callback
lobis Oct 12, 2023
779abe2
remove comment
lobis Oct 12, 2023
8802e85
add return type hint to `closed` and `num_bytes`
lobis Oct 12, 2023
606cb89
more return type hints
lobis Oct 12, 2023
d122e20
move "Chunk" class position in file to allow type hinting (no code ch…
lobis Oct 12, 2023
1cd6cc3
add `add_done_callback` to TrivialFuture
lobis Oct 12, 2023
3fd0f1b
remove tmp_path
lobis Oct 12, 2023
2d21ed2
call callback immediately in trivial future
lobis Oct 12, 2023
7c14c69
Revert "move "Chunk" class position in file to allow type hinting (no…
lobis Oct 12, 2023
cf0d69b
import annotations from __future__, type hints
lobis Oct 12, 2023
6435b9e
chunks return type hint
lobis Oct 12, 2023
f299585
import skip for aiohttp
lobis Oct 12, 2023
08d04d4
add annotations
lobis Oct 12, 2023
de1337f
add license string
lobis Oct 12, 2023
ce0d852
remove check for http
lobis Oct 12, 2023
d5a863c
test with "use_threads" enabled and disabled
lobis Oct 12, 2023
aa1a73b
Merge branch 'main-v510' into fsspec-chunks
lobis Oct 12, 2023
7edd178
merge with main
lobis Oct 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions src/uproot/source/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,21 @@ def chunks(self, ranges, notifications):
self._num_requests += 1
self._num_requested_chunks += len(ranges)
self._num_requested_bytes += sum(stop - start for start, stop in ranges)
data = self._fs.cat_ranges(
[self._file_path] * len(ranges),
[start for start, _ in ranges],
[stop for _, stop in ranges],
)

executor = uproot.source.futures.ResourceThreadPoolExecutor(ranges)
lobis marked this conversation as resolved.
Show resolved Hide resolved

chunks = []
for item, (start, stop) in zip(data, ranges):
future = uproot.source.futures.TrivialFuture(item)
for start, stop in ranges:

def task(_range: tuple = (start, stop)):
return self._fs.cat_file(self._file_path, _range[0], _range[1])

future = uproot.source.futures.ResourceFuture(task)
chunk = uproot.source.chunk.Chunk(self, start, stop, future)
uproot.source.chunk.notifier(chunk, notifications)()
future._set_notify(uproot.source.chunk.notifier(chunk, notifications))
executor.submit(future)
chunks.append(chunk)

return chunks

@property
Expand Down
21 changes: 20 additions & 1 deletion tests/test_0692_fsspec.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# BSD 3-Clause License; see https://github.com/scikit-hep/uproot4/blob/main/LICENSE

import pytest

import uproot
import uproot.source.fsspec

import skhep_testdata
import queue


@pytest.mark.network
Expand Down Expand Up @@ -66,3 +66,22 @@ def test_open_fsspec_xrootd():
data = f["Events/run"].array(library="np", entry_stop=20)
assert len(data) == 20
assert (data == 194778).all()


@pytest.mark.network
def test_fsspec_chunks():
# Use the local HTTP server to serve test.root
url = "https://github.com/scikit-hep/scikit-hep-testdata/raw/v0.4.33/src/skhep_testdata/data/uproot-issue121.root"

notifications = queue.Queue()
with uproot.source.fsspec.FSSpecSource(url) as source:
chunks = source.chunks(
[(0, 100), (50, 55), (200, 400)], notifications=notifications
)
expected = {(chunk.start, chunk.stop): chunk for chunk in chunks}
while len(expected) > 0:
chunk = notifications.get()
expected.pop((chunk.start, chunk.stop))

chunk_data_sum = {sum(chunk.raw_data) for chunk in chunks}
assert chunk_data_sum == {3967, 413, 10985}, "Chunk data does not match"