Skip to content

Commit

Permalink
[copier] minor clean up to copier (#13820)
Browse files Browse the repository at this point in the history
1. File rate is more interesting for small files.
2. The source_report controls the progress bar. By updating it eagerly
while we are listing a directory, the progress bar is more accurate
sooner. We currently wait until we get a semaphore for a particular file
to update the progress bar.
  • Loading branch information
danking authored Oct 17, 2023
1 parent ecb7d86 commit f0328b6
Showing 1 changed file with 20 additions and 10 deletions.
30 changes: 20 additions & 10 deletions hail/python/hailtop/aiotools/fs/copier.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, AsyncIterator, Awaitable, Optional, List, Union, Dict, Callable
from typing import Any, AsyncIterator, Awaitable, Optional, List, Union, Dict, Callable, Tuple
import os
import os.path
import asyncio
Expand Down Expand Up @@ -163,7 +163,10 @@ def add_source_reports(transfer_report):
print(f' Time: {humanize_timedelta_msecs(self._duration)}')
assert self._duration is not None
if self._duration > 0:
print(f' Average transfer rate: {humanize.naturalsize(total_bytes / (self._duration / 1000))}/s')
bandwidth = humanize.naturalsize(total_bytes / (self._duration / 1000))
print(f' Average bandwidth: {bandwidth}/s')
file_rate = total_files / (self._duration / 1000)
print(f' Average file rate: {file_rate:,.1f}/s')

print('Sources:')
for sr in source_reports:
Expand Down Expand Up @@ -289,8 +292,6 @@ async def _copy_file_multi_part(
destfile: str,
return_exceptions: bool
) -> None:
source_report.start_files(1)
source_report.start_bytes(await srcstat.size())
success = False
try:
await self._copy_file_multi_part_main(sema, source_report, srcfile, srcstat, destfile, return_exceptions)
Expand Down Expand Up @@ -350,6 +351,8 @@ async def copy_as_file(self,
if full_dest_type == AsyncFS.DIR:
raise IsADirectoryError(full_dest)

source_report.start_files(1)
source_report.start_bytes(await srcstat.size())
await self._copy_file_multi_part(sema, source_report, src, srcstat, full_dest, return_exceptions)

async def copy_as_dir(self, sema: asyncio.Semaphore, source_report: SourceReport, return_exceptions: bool):
Expand Down Expand Up @@ -394,19 +397,26 @@ async def copy_source(srcentry: FileListEntry) -> None:

await self._copy_file_multi_part(sema, source_report, srcfile, await srcentry.status(), url_join(full_dest, relsrcfile), return_exceptions)

async def create_copies() -> List[Callable[[], Awaitable[None]]]:
async def create_copies() -> Tuple[List[Callable[[], Awaitable[None]]], int]:
nonlocal srcentries
bytes_to_copy = 0
if srcentries is None:
srcentries = await files_iterator()
try:
return [
functools.partial(copy_source, srcentry)
async for srcentry in srcentries]
copy_thunks = []
async for srcentry in srcentries:
# In cloud FSes, status and size never make a network request. In local FS, they
# can make system calls on symlinks. This line will be fairly expensive if
# copying a tree with a lot of symlinks.
bytes_to_copy += await (await srcentry.status()).size()
copy_thunks.append(functools.partial(copy_source, srcentry))
return (copy_thunks, bytes_to_copy)
finally:
srcentries = None

copies = await retry_transient_errors(create_copies)

copies, bytes_to_copy = await retry_transient_errors(create_copies)
source_report.start_files(len(copies))
source_report.start_bytes(bytes_to_copy)
await bounded_gather2(sema, *copies, cancel_on_error=True)

async def copy(self, sema: asyncio.Semaphore, source_report: SourceReport, return_exceptions: bool):
Expand Down

0 comments on commit f0328b6

Please sign in to comment.