Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[copier] minor clean up to copier #13820

Merged
merged 2 commits into from
Oct 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions hail/python/hailtop/aiotools/fs/copier.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, AsyncIterator, Awaitable, Optional, List, Union, Dict, Callable
from typing import Any, AsyncIterator, Awaitable, Optional, List, Union, Dict, Callable, Tuple
import os
import os.path
import asyncio
Expand Down Expand Up @@ -163,7 +163,10 @@ def add_source_reports(transfer_report):
print(f' Time: {humanize_timedelta_msecs(self._duration)}')
assert self._duration is not None
if self._duration > 0:
print(f' Average transfer rate: {humanize.naturalsize(total_bytes / (self._duration / 1000))}/s')
bandwidth = humanize.naturalsize(total_bytes / (self._duration / 1000))
print(f' Average bandwidth: {bandwidth}/s')
file_rate = total_files / (self._duration / 1000)
print(f' Average file rate: {file_rate:,.1f}/s')
jigold marked this conversation as resolved.
Show resolved Hide resolved

print('Sources:')
for sr in source_reports:
Expand Down Expand Up @@ -289,8 +292,6 @@ async def _copy_file_multi_part(
destfile: str,
return_exceptions: bool
) -> None:
source_report.start_files(1)
source_report.start_bytes(await srcstat.size())
success = False
try:
await self._copy_file_multi_part_main(sema, source_report, srcfile, srcstat, destfile, return_exceptions)
Expand Down Expand Up @@ -350,6 +351,8 @@ async def copy_as_file(self,
if full_dest_type == AsyncFS.DIR:
raise IsADirectoryError(full_dest)

source_report.start_files(1)
source_report.start_bytes(await srcstat.size())
await self._copy_file_multi_part(sema, source_report, src, srcstat, full_dest, return_exceptions)

async def copy_as_dir(self, sema: asyncio.Semaphore, source_report: SourceReport, return_exceptions: bool):
Expand Down Expand Up @@ -394,19 +397,26 @@ async def copy_source(srcentry: FileListEntry) -> None:

await self._copy_file_multi_part(sema, source_report, srcfile, await srcentry.status(), url_join(full_dest, relsrcfile), return_exceptions)

async def create_copies() -> List[Callable[[], Awaitable[None]]]:
async def create_copies() -> Tuple[List[Callable[[], Awaitable[None]]], int]:
nonlocal srcentries
bytes_to_copy = 0
if srcentries is None:
srcentries = await files_iterator()
try:
return [
functools.partial(copy_source, srcentry)
async for srcentry in srcentries]
copy_thunks = []
async for srcentry in srcentries:
# In cloud FSes, status and size never make a network request. In local FS, they
# can make system calls on symlinks. This line will be fairly expensive if
# copying a tree with a lot of symlinks.
bytes_to_copy += await (await srcentry.status()).size()
copy_thunks.append(functools.partial(copy_source, srcentry))
return (copy_thunks, bytes_to_copy)
finally:
srcentries = None

copies = await retry_transient_errors(create_copies)

copies, bytes_to_copy = await retry_transient_errors(create_copies)
source_report.start_files(len(copies))
source_report.start_bytes(bytes_to_copy)
await bounded_gather2(sema, *copies, cancel_on_error=True)

async def copy(self, sema: asyncio.Semaphore, source_report: SourceReport, return_exceptions: bool):
Expand Down