-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: fsspec
source non-blocking chunks
#979
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't actually make the Source non-blocking (see below).
It could help to see how these chunks
are going to be used. Calls to the singular chunk
method are all over the rest-of-Uproot codebase, but most of those use the single chunk
immediately after requesting it, so it's not very important for them to be non-blocking, but they're non-blocking anyway.
The plural chunks
method is only used by the array-fetching loop:
uproot5/src/uproot/behaviors/TBranch.py
Lines 3005 to 3152 in 576fbdb
notifications = queue.Queue() | |
branchid_arrays = {} | |
branchid_num_baskets = {} | |
ranges = [] | |
range_args = {} | |
range_original_index = {} | |
original_index = 0 | |
branchid_to_branch = {} | |
for cache_key in branchid_interpretation: | |
branchid_num_baskets[cache_key] = 0 | |
for branch, basket_num, range_or_basket in ranges_or_baskets: | |
branchid_num_baskets[branch.cache_key] += 1 | |
if branch.cache_key not in branchid_arrays: | |
branchid_arrays[branch.cache_key] = {} | |
if isinstance(range_or_basket, tuple) and len(range_or_basket) == 2: | |
range_or_basket = ( # noqa: PLW2901 (overwriting range_or_basket) | |
int(range_or_basket[0]), | |
int(range_or_basket[1]), | |
) | |
ranges.append(range_or_basket) | |
range_args[range_or_basket] = (branch, basket_num) | |
range_original_index[range_or_basket] = original_index | |
else: | |
notifications.put(range_or_basket) | |
original_index += 1 | |
branchid_to_branch[branch.cache_key] = branch | |
for cache_key, interpretation in branchid_interpretation.items(): | |
if branchid_num_baskets[cache_key] == 0 and cache_key not in arrays: | |
arrays[cache_key] = interpretation.final_array( | |
{}, 0, 0, [0], library, None, interp_options | |
) | |
# check for CannotBeAwkward errors on the main thread before reading any data | |
if isinstance(library, uproot.interpretation.library.Awkward) and isinstance( | |
interpretation, uproot.interpretation.objects.AsObjects | |
): | |
branchid_to_branch[cache_key]._awkward_check(interpretation) | |
hasbranches._file.source.chunks(ranges, notifications=notifications) | |
def replace(ranges_or_baskets, original_index, basket): | |
branch, basket_num, range_or_basket = ranges_or_baskets[original_index] | |
ranges_or_baskets[original_index] = branch, basket_num, basket | |
def chunk_to_basket(chunk, branch, basket_num): | |
try: | |
cursor = uproot.source.cursor.Cursor(chunk.start) | |
basket = uproot.models.TBasket.Model_TBasket.read( | |
chunk, | |
cursor, | |
{"basket_num": basket_num}, | |
hasbranches._file, | |
hasbranches._file, | |
branch, | |
) | |
original_index = range_original_index[(chunk.start, chunk.stop)] | |
if update_ranges_or_baskets: | |
replace(ranges_or_baskets, original_index, basket) | |
except Exception: | |
notifications.put(sys.exc_info()) | |
else: | |
notifications.put(basket) | |
# all threads (if multithreaded), per branch, share a thread-local context for Forth | |
forth_context = {x: threading.local() for x in branchid_interpretation} | |
def basket_to_array(basket): | |
try: | |
assert basket.basket_num is not None | |
branch = basket.parent | |
interpretation = branchid_interpretation[branch.cache_key] | |
basket_arrays = branchid_arrays[branch.cache_key] | |
context = dict(branch.context) | |
context["forth"] = forth_context[branch.cache_key] | |
basket_arrays[basket.basket_num] = interpretation.basket_array( | |
basket.data, | |
basket.byte_offsets, | |
basket, | |
branch, | |
context, | |
basket.member("fKeylen"), | |
library, | |
interp_options, | |
) | |
if basket.num_entries != len(basket_arrays[basket.basket_num]): | |
raise ValueError( | |
"""basket {} in tree/branch {} has the wrong number of entries """ | |
"""(expected {}, obtained {}) when interpreted as {} | |
in file {}""".format( | |
basket.basket_num, | |
branch.object_path, | |
basket.num_entries, | |
len(basket_arrays[basket.basket_num]), | |
interpretation, | |
branch.file.file_path, | |
) | |
) | |
basket = None | |
if len(basket_arrays) == branchid_num_baskets[branch.cache_key]: | |
arrays[branch.cache_key] = interpretation.final_array( | |
basket_arrays, | |
entry_start, | |
entry_stop, | |
branch.entry_offsets, | |
library, | |
branch, | |
interp_options, | |
) | |
# no longer needed, save memory | |
basket_arrays.clear() | |
except Exception: | |
notifications.put(sys.exc_info()) | |
else: | |
notifications.put(None) | |
while len(arrays) < len(branchid_interpretation): | |
obj = notifications.get() | |
if isinstance(obj, uproot.source.chunk.Chunk): | |
args = range_args[(obj.start, obj.stop)] | |
decompression_executor.submit(chunk_to_basket, obj, *args) | |
elif isinstance(obj, uproot.models.TBasket.Model_TBasket): | |
interpretation_executor.submit(basket_to_array, obj) | |
elif obj is None: | |
pass | |
elif isinstance(obj, tuple) and len(obj) == 3: | |
uproot.source.futures.delayed_raise(*obj) | |
else: | |
raise AssertionError(obj) | |
obj = None # release before blocking |
The notifications
queue is the central object of the event loop. Although the Source is only responsible for putting Chunk
objects on that queue, the same queue is used for all steps of array-building:
start
-stop
pair of seek positions →Chunk
Chunk
→uproot.models.TBasket
, which ischunk_to_basket
uproot.models.TBasket
→ per-basket array, which isbasket_to_array
Outside of the loop, the per-basket arrays are concatenated into one array, which is returned to the user.
The event loop itself is this while loop.
So—what you want to do is return not-necessarily-filled Chunks
from the chunks
method and instrument the response of a request-response pair with a notifier function. As soon as a response comes in, call the notifier so that the Chunk
goes onto the notifications
queue, and the above code will get past the
uproot5/src/uproot/behaviors/TBranch.py
Line 3134 in 576fbdb
obj = notifications.get() |
line.
The PR is again ready to be reviewed. Now it should work as intended, the I have some questions regarding the management of the executor. Currently I am creating a local executor in the Looks like doing the lifecycle management would mean refactoring the current code though, since my |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comes down to a fundamental question: what is fsspec's scope? How much does it do? There's this cat_file
interface, which does the whole process from URL to file bytes, but it's a blocking interface. Is there an async interface? If so, does it have a way to attach a function to call when the async function is done?
If so, then FSSpecSource doesn't need to have an executor. Suppose there's a function
FS.async_cat_file(path: str, start: int, stop: int) -> concurrent.futures.Future
then you could use it like this:
for item, (start, stop) in zip(data, ranges):
future = self._fs.async_cat_file(self._file_path, start, stop)
chunk = uproot.source.chunk.Chunk(self, start, stop, future)
future.add_done_callback(uproot.source.chunk.notifier(chunk, notifications))
chunks.append(chunk)
(although I think that add_done_callback
expects the function to take 1 argument and our notifier takes 0 arguments, but these things are easy to work around). If the fsspec interface uses asyncio.Future
instead of concurrent.futures.Future
(which we designed around), then it would be a little different, which could be done by wrapping it with the interface we expect (i.e. future.result()
instead of await future
).
So... what can fsspec do for us?
I don't understand this, don't you always need some kind of executor to run futures? |
The executor might be inside of the fsspec library. I don't know that—I'm just thinking it might be, and if it is, we should take advantage of it. Also, it might use |
From my understanding I imagine it would be possible to work with the sync filesystem but use an async one inside the Regarding this PR: I think it's done (unless there is some mistake) as currently the chunks interface is non-blocking. Depending on the complexity of leveraging the fsspec asyncio interface I will include it into this PR, a separate PR, or do nothing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good. Does it depend on #983?
Co-authored-by: Jim Pivarski <jpivarski@users.noreply.github.com>
This reverts commit fc2759f.
It does not. |
… code changes)" This reverts commit d122e20.
pytest.importorskip("aiohttp") in the failing function. That's a non-strict dependency, so any of our tests that use it have to be skipped when the package isn't available. This one failed because it's Python 3.12 and presumably the package isn't available yet, so I'm glad we got this chance to check. If there are any tests that need fsspec or another non-strict dependency to run, those tests need to have |
I think file.py needs |
This PR adds a non-blocking implementation of the
chunks
interface.It uses the
AbstractFileSystem.cat_file
api to get a byte range of the file. This api should only request the selected range (as opposed to the whole file, then slicing). In order to make sure this is the case I opened a discussion here.It also adds some type annotations to the
uproot.source
files.