diff --git a/hail/python/hailtop/aiotools/fs/copier.py b/hail/python/hailtop/aiotools/fs/copier.py index b367c069e48..913f00da4a6 100644 --- a/hail/python/hailtop/aiotools/fs/copier.py +++ b/hail/python/hailtop/aiotools/fs/copier.py @@ -1,4 +1,4 @@ -from typing import Any, AsyncIterator, Awaitable, Optional, List, Union, Dict, Callable +from typing import Any, AsyncIterator, Awaitable, Optional, List, Union, Dict, Callable, Tuple import os import os.path import asyncio @@ -163,7 +163,10 @@ def add_source_reports(transfer_report): print(f' Time: {humanize_timedelta_msecs(self._duration)}') assert self._duration is not None if self._duration > 0: - print(f' Average transfer rate: {humanize.naturalsize(total_bytes / (self._duration / 1000))}/s') + bandwidth = humanize.naturalsize(total_bytes / (self._duration / 1000)) + print(f' Average bandwidth: {bandwidth}/s') + file_rate = total_files / (self._duration / 1000) + print(f' Average file rate: {file_rate:,.1f}/s') print('Sources:') for sr in source_reports: @@ -289,8 +292,6 @@ async def _copy_file_multi_part( destfile: str, return_exceptions: bool ) -> None: - source_report.start_files(1) - source_report.start_bytes(await srcstat.size()) success = False try: await self._copy_file_multi_part_main(sema, source_report, srcfile, srcstat, destfile, return_exceptions) @@ -350,6 +351,8 @@ async def copy_as_file(self, if full_dest_type == AsyncFS.DIR: raise IsADirectoryError(full_dest) + source_report.start_files(1) + source_report.start_bytes(await srcstat.size()) await self._copy_file_multi_part(sema, source_report, src, srcstat, full_dest, return_exceptions) async def copy_as_dir(self, sema: asyncio.Semaphore, source_report: SourceReport, return_exceptions: bool): @@ -394,19 +397,26 @@ async def copy_source(srcentry: FileListEntry) -> None: await self._copy_file_multi_part(sema, source_report, srcfile, await srcentry.status(), url_join(full_dest, relsrcfile), return_exceptions) - async def create_copies() -> List[Callable[[], Awaitable[None]]]: + async def create_copies() -> Tuple[List[Callable[[], Awaitable[None]]], int]: nonlocal srcentries + bytes_to_copy = 0 if srcentries is None: srcentries = await files_iterator() try: - return [ - functools.partial(copy_source, srcentry) - async for srcentry in srcentries] + copy_thunks = [] + async for srcentry in srcentries: + # In cloud FSes, status and size never make a network request. In local FS, they + # can make system calls on symlinks. This line will be fairly expensive if + # copying a tree with a lot of symlinks. + bytes_to_copy += await (await srcentry.status()).size() + copy_thunks.append(functools.partial(copy_source, srcentry)) + return (copy_thunks, bytes_to_copy) finally: srcentries = None - copies = await retry_transient_errors(create_copies) - + copies, bytes_to_copy = await retry_transient_errors(create_copies) + source_report.start_files(len(copies)) + source_report.start_bytes(bytes_to_copy) await bounded_gather2(sema, *copies, cancel_on_error=True) async def copy(self, sema: asyncio.Semaphore, source_report: SourceReport, return_exceptions: bool):