From 40e42bd8ab9ea5e6997107bef5f41cf60e548b51 Mon Sep 17 00:00:00 2001 From: Danny McClanahan <1305167+cosmicexplorer@users.noreply.github.com> Date: Tue, 25 Jul 2023 14:44:54 -0400 Subject: [PATCH] initial stitch impl --- pex/bin/pex.py | 4 +- pex/medusa.py | 217 -------------------------------------- pex/pex_builder.py | 174 +++++++++++++++--------------- tests/test_pex_builder.py | 13 +-- 4 files changed, 96 insertions(+), 312 deletions(-) delete mode 100644 pex/medusa.py diff --git a/pex/bin/pex.py b/pex/bin/pex.py index 5d566066a..c4e4696a4 100755 --- a/pex/bin/pex.py +++ b/pex/bin/pex.py @@ -161,8 +161,8 @@ def configure_clp_pex_options(parser): "tradeoffs. Both zipapp and packed layouts install themselves in the PEX_ROOT as loose " "apps by default before executing, but these layouts compose with `--venv` execution " "mode as well and support `--seed`ing. A stitched layout is the same as zipapp, but " - "may be significantly faster to create due to caching. A stitched layout requires the - `zip` command to be installed, however." + "may be significantly faster to create due to caching. A stitched layout requires the " + "`zip` command to be installed, however." ), ) diff --git a/pex/medusa.py b/pex/medusa.py deleted file mode 100644 index 4e2723cd9..000000000 --- a/pex/medusa.py +++ /dev/null @@ -1,217 +0,0 @@ -# Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). -# Licensed under the Apache License, Version 2.0 (see LICENSE). - -from __future__ import absolute_import - -import itertools -import os -from contextlib import contextmanager - -from medusa_zip import EntryName, FileSource -from medusa_zip.crawl import CrawlResult, Ignores, MedusaCrawl -from medusa_zip.destination import DestinationBehavior -from medusa_zip.merge import MedusaMerge, MergeGroup -from medusa_zip.zip import ( - AutomaticModifiedTimeStrategy, - CompressionMethod, - CompressionOptions, - EntryModifications, - MedusaZip, - ModifiedTimeBehavior, - Parallelism, - ZipOutputOptions, -) - -from pex.tracer import TRACER -from pex.typing import TYPE_CHECKING - -if TYPE_CHECKING: - from typing import ( - Any, - Callable, - DefaultDict, - Dict, - FrozenSet, - Iterable, - Iterator, - List, - NoReturn, - Optional, - Set, - Sized, - Tuple, - Union, - cast, - ) - - import attr # vendor:skip - - from pex.common import Chroot -else: - from pex.third_party import attr - from pex.typing import cast - - -class Label(str): - pass - - -class OutputFile(str): - pass - - -class MergeInput(str): - pass - - -@contextmanager -def working_dir(cwd): - prev = os.getcwd() - os.chdir(cwd) - try: - yield - finally: - os.chdir(prev) - - -@attr.s(frozen=True) -class FingerprintedDist(object): - source_path = attr.ib() # type: str - dist_name = attr.ib() # type: Label - # FIXME: this isn't used! - fingerprint = attr.ib() # type: str - - -class IntermediateZipCache(object): - _pyc_tmp_ignores = Ignores([r"\.pyc\.[0-9]+$"]) - - def __init__(self): - # type: () -> None - self._cache = {} # type: Dict[Label, FingerprintedDist] - - def for_label(self, label): - # type: (Label) -> Optional[FingerprintedDist] - return self._cache.get(label, None) - - def cache_intermediate_dist(self, path, dist_name, fingerprint): - # type: (str, str, str) -> None - assert os.path.isdir(path) # can be link - assert dist_name # can't be empty - label = Label(dist_name) - # TODO: just ignore this? - assert label not in self._cache - assert fingerprint # can't be empty - - self._cache[label] = FingerprintedDist( - source_path=path, - dist_name=label, - fingerprint=fingerprint, - ) - - def get_or_create_zipapp_direct(self, fp_dist, output_file): # type: ignore[return] - # type: (FingerprintedDist, OutputFile) -> str - # FIXME: should not need to set cwd!!! - with working_dir(fp_dist.source_path): - crawl_spec = MedusaCrawl(["."], self._pyc_tmp_ignores) - crawl_result = crawl_spec.crawl_paths_sync() - - mtime_behavior = ModifiedTimeBehavior.automatic(AutomaticModifiedTimeStrategy.Reproducible) - # TODO: consider setting compression level? - zip_options = ZipOutputOptions(mtime_behavior) - - entry_modifications = EntryModifications( - silent_external_prefix=".deps", - own_prefix=fp_dist.dist_name, - ) - - medusa_zip = crawl_result.medusa_zip(zip_options, entry_modifications) - - with DestinationBehavior.AlwaysTruncate.initialize_sync(output_file) as output: - return str(medusa_zip.zip_sync(output).output_path) - - def _extract_zip_input( - self, - chroot, # type: Chroot - labels, # type: Iterable[Label] - exclude_file=lambda _: False, # type: Callable[[str], bool] - strip_prefix=None, # type: Optional[str] - ): - # type: (...) -> Iterable[FileSource] - source_files = itertools.chain.from_iterable( - chroot.filesets.get(label, ()) for label in labels - ) - - # NB: we are not crawling the filesystem ourselves at all here, and instead relying on - # the bookkeeping we have already done within pex. - return [ - FileSource( - name=EntryName(os.path.relpath(p, strip_prefix) if strip_prefix else p), - source=os.path.join(chroot.chroot, p), - ) - for p in source_files - if not exclude_file(p) - ] - - def medusa_zip_sources_with_known_deps( # type: ignore[return] - self, - chroot, # type: Chroot - output_file, # type: OutputFile - known_deps, # type: List[Tuple[Label, MergeInput]] - mode="w", # type: str - deterministic_timestamp=False, # type: bool - exclude_file=lambda _: False, # type: Callable[[str], bool] - strip_prefix=None, # type: Optional[str] - labels=None, # type: Optional[Iterable[Optional[str]]] - compress=True, # type: bool - ): - # type: (...) -> str - merge_deps = dict(known_deps) # type: Dict[Label, MergeInput] - - known_labels = frozenset(merge_deps.keys()) # type: FrozenSet[Label] - - # Extract all labels if not more narrowly specified. - selected_labels = frozenset( - Label(l) for l in (labels or chroot.labels()) - ) # type: FrozenSet[Label] - - source_labels = selected_labels - known_labels # type: FrozenSet[Label] - - sorted_deps = sorted(known_labels) - merge_group = MergeGroup( - prefix=EntryName(".deps"), - sources=[merge_deps[l] for l in sorted_deps], - ) - merge_spec = MedusaMerge(groups=[merge_group]) - - zip_input = self._extract_zip_input( - chroot, - sorted(source_labels), - exclude_file=exclude_file, - strip_prefix=strip_prefix, - ) - - if deterministic_timestamp: - automatic = AutomaticModifiedTimeStrategy.Reproducible - else: - automatic = AutomaticModifiedTimeStrategy.PreserveSourceTime - mtime_behavior = ModifiedTimeBehavior.automatic(automatic) - if compress: - compression_options = CompressionOptions.default() - else: - compression_options = CompressionOptions( - method=CompressionMethod.Stored, - level=None, - ) - zip_options = ZipOutputOptions(mtime_behavior, compression_options) - - zip_spec = MedusaZip(input_files=zip_input, zip_options=zip_options) - - if mode == "a": - behavior = DestinationBehavior.AppendToNonZip - else: - assert mode == "w" - behavior = DestinationBehavior.AlwaysTruncate - - with behavior.initialize_sync(output_file) as output: - output = zip_spec.zip_sync(output) - return str(merge_spec.merge_sync(mtime_behavior, output).output_path) diff --git a/pex/pex_builder.py b/pex/pex_builder.py index dd2309099..35b9416e6 100644 --- a/pex/pex_builder.py +++ b/pex/pex_builder.py @@ -3,7 +3,6 @@ from __future__ import absolute_import -import functools import hashlib import logging import os @@ -31,7 +30,7 @@ from pex.environment import PEXEnvironment from pex.finders import get_entry_point_from_console_script, get_script_from_distributions from pex.interpreter import PythonInterpreter -from pex.jobs import Job, Raise, SpawnedJob, execute_parallel +from pex.jobs import Job from pex.layout import Layout from pex.orderedset import OrderedSet from pex.pex import PEX @@ -45,10 +44,6 @@ if TYPE_CHECKING: from typing import Callable, Dict, List, Optional, Tuple - import attr # vendor:skip -else: - from pex.third_party import attr - class CopyMode(Enum["CopyMode.Value"]): class Value(Enum.Value): @@ -226,8 +221,8 @@ def __init__( self._frozen = False self._distributions = {} # type: Dict[str, Distribution] - if self._copy_mode == CopyMode.STITCHED: - self._stitched_zip_dist_cache = {} # type: IntermediateZipCache + if self._copy_mode == CopyMode.STITCH: + self._stitch_merge_todo = {} # type: Dict[str, str] def _ensure_unfrozen(self, name="Operation"): if self._frozen: @@ -448,12 +443,14 @@ def set_header(self, header): def _add_dist_dir(self, path, dist_name, fingerprint=None): # type: (str, str, Optional[str]) -> str target_dir = os.path.join(self._pex_info.internal_cache, dist_name) - if self._copy_mode in [CopyMode.STITCHED, CopyMode.SYMLINK]: + if self._copy_mode == CopyMode.SYMLINK: + self._copy_or_link(path, target_dir, label=dist_name) + elif self._copy_mode == CopyMode.STITCH: self._copy_or_link(path, target_dir, label=dist_name) # FIXME: The fingerprint is available for every dependency dist, but not the bootstrap # deps (pip/setuptools/wheel). - if self._copy_mode == CopyMode.MEDUSA and fingerprint: - self._zip_cache.cache_intermediate_dist(path, dist_name, fingerprint) + if fingerprint: + self._stitch_merge_todo[fingerprint] = path else: for root, _, files in os.walk(path): for f in files: @@ -461,8 +458,8 @@ def _add_dist_dir(self, path, dist_name, fingerprint=None): relpath = os.path.relpath(filename, path) target = os.path.join(target_dir, relpath) self._copy_or_link(filename, target, label=dist_name) - # FIXME: make this faster! - # FIXME: avoid running dir_hash over pip/setuptools/wheel provided during bootstrap! + # FIXME: avoid running dir_hash over pip/setuptools/wheel provided during bootstrap (the + # `fingerprint` is currently None)! return fingerprint or CacheHelper.dir_hash(path) def add_distribution( @@ -566,7 +563,7 @@ def _copy_or_link(self, src, dst, label=None): self._chroot.touch(dst, label) elif self._copy_mode == CopyMode.COPY: self._chroot.copy(src, dst, label) - elif self._copy_mode in [CopyMode.SYMLINK, CopyMode.MEDUSA]: + elif self._copy_mode in [CopyMode.SYMLINK, CopyMode.STITCH]: self._chroot.symlink(src, dst, label) else: assert self._copy_mode == CopyMode.LINK @@ -797,71 +794,52 @@ def zip_cache_dir(path): os.path.join(internal_cache, location), ) - def _build_zipapp_with_medusa_caching( + def _cache_dists_for_stitching( self, - filename, # type: str - pex_info, # type: PexInfo - mode, # type: str - deterministic_timestamp=False, # type: bool - compress=True, # type: bool - exclude_file=lambda _: False, # type: Callable[[str], bool] + deterministic_timestamp, # type: bool + compress, # type: bool ): - # type: (...) -> None - from pex.medusa import Label, MergeInput, OutputFile - - known_deps = [] # type: List[Tuple[Label, MergeInput]] - if pex_info.distributions: - with TRACER.timed("creating intermediate cached dists"): - for location, fingerprint in pex_info.distributions.items(): - cached_installed_wheel_zip_dir = os.path.join( - pex_info.pex_root, - "intermediate_wheels", - fingerprint, - ) - with atomic_directory(cached_installed_wheel_zip_dir) as atomic_zip_dir: - if not atomic_zip_dir.is_finalized(): - fp_dist = self._zip_cache.for_label(Label(location)) - assert fp_dist is not None - output_file = OutputFile( - os.path.join(atomic_zip_dir.work_dir, location), + # type: (...) -> Dict[str, str] + """A 'stitched' copy mode will merge in these files' contents verbatim.""" + merge_deps = {} # type: Dict[str, str] + if not self._pex_info.distributions: + return merge_deps + + with TRACER.timed("caching dists for stitched output"): + for dist_label, fingerprint in self._pex_info.distributions.items(): + cache_key = "{}-{}-{}".format( + fingerprint, + "compressed" if compress else "uncompressed", + "deterministic-timestamp" + if deterministic_timestamp + else "nondeterministic-timestamp", + ) + cached_zip = os.path.join( + self._pex_info.pex_root, + "stitched_dists", + cache_key, + "{}.zip".format(dist_label), + ) + with atomic_directory(os.path.dirname(cached_zip)) as atomic_zip_dir: + if not atomic_zip_dir.is_finalized(): + dist_location = self._stitch_merge_todo[fingerprint] + atomic_output_file = os.path.join( + atomic_zip_dir.work_dir, os.path.basename(cached_zip) + ) + with TRACER.timed( + "caching {} at {}".format(dist_label, atomic_output_file) + ): + self._chroot.zip( + atomic_output_file, + labels=(dist_label,), + deterministic_timestamp=deterministic_timestamp, + compress=compress, + exclude_file=is_pyc_temporary_file, ) - with TRACER.timed( - "creating intermediate {} at {}".format(location, output_file) - ): - f = self._zip_cache.get_or_create_zipapp_direct( - fp_dist, - output_file, - ) - assert f == output_file - final_output_file = os.path.join(cached_installed_wheel_zip_dir, location) - known_deps.append((Label(location), MergeInput(final_output_file))) - with TRACER.timed("creating source zip and merging deps"): - self._zip_cache.medusa_zip_sources_with_known_deps( - self._chroot, - OutputFile(filename), - known_deps, - mode=mode, - deterministic_timestamp=deterministic_timestamp, - exclude_file=exclude_file, - compress=compress, - ) + assert os.path.isfile(cached_zip) + merge_deps[dist_label] = cached_zip - def _build_zipapp_without_caching( - self, - filename, # type: str - mode, # type: str - deterministic_timestamp=False, # type: bool - compress=True, # type: bool - exclude_file=lambda _: False, # type: Callable[[str], bool] - ): - # type: (...) -> None - self._chroot.zip( - filename, - mode=mode, - deterministic_timestamp=deterministic_timestamp, - exclude_file=exclude_file, - compress=compress, - ) + return merge_deps def _build_zipapp( self, @@ -871,9 +849,6 @@ def _build_zipapp( ): # type: (...) -> None - pex_info = self._pex_info.copy() - pex_info.update(PexInfo.from_env()) - # (1) Generate a very short *binary* file with a shebang line. with safe_open(filename, "wb") as pexfile: assert os.path.getsize(pexfile.name) == 0 @@ -891,17 +866,46 @@ def _build_zipapp( # (2) Append the zip file's contents to the end of the shebang stub file. with TRACER.timed("Zipping PEX file."): - if self._copy_mode == CopyMode.MEDUSA: - self._build_zipapp_with_medusa_caching( - filename, - pex_info, - mode="a", + if self._copy_mode == CopyMode.STITCH: + merge_deps = self._cache_dists_for_stitching( deterministic_timestamp=deterministic_timestamp, compress=compress, - exclude_file=exclude_file, ) + uncached_labels = sorted( + frozenset(self._chroot.labels()) - frozenset(merge_deps.keys()) + ) + with TRACER.timed("zipping up uncached sources"): + self._chroot.zip( + filename, + mode="a", + deterministic_timestamp=deterministic_timestamp, + compress=compress, + labels=uncached_labels, + ) + with tempfile.NamedTemporaryFile(mode="w+b", suffix=".broken-zip") as tmp_concat: + with TRACER.timed("concatenating inputs"): + input_zips = [filename] + [ + merge_zip for _, merge_zip in + # Sort the cached zips by the prefixes of the filenames they'll be + # inserting into the merged result. + sorted(merge_deps.items(), key=lambda x: x[0]) + ] + for path in input_zips: + with open(path, "rb") as f: + shutil.copyfileobj(f, tmp_concat) # type: ignore[misc] + tmp_concat.flush() + + with TRACER.timed("fixing up concatenated inputs with zip -FF"): + command = ["zip", '-FF', tmp_concat.name, '--out', filename] + zip_proc = subprocess.Popen(command, + stderr=subprocess.PIPE, + ) + assert zip_proc.stdin is not None + job = Job(command, zip_proc) + shutil.copyfileobj(tmp_concat, zip_proc.stdin) # type: ignore[misc] + job.wait() else: - self._build_zipapp_without_caching( + self._chroot.zip( filename, mode="a", deterministic_timestamp=deterministic_timestamp, diff --git a/tests/test_pex_builder.py b/tests/test_pex_builder.py index 4c5b80db5..fd4aa23e1 100644 --- a/tests/test_pex_builder.py +++ b/tests/test_pex_builder.py @@ -34,13 +34,10 @@ """ -@pytest.mark.parametrize( - "copy_mode", [pytest.param(copy_mode, id=copy_mode.value) for copy_mode in CopyMode.values()] -) -def test_pex_builder_basic(copy_mode): - # type: (CopyMode.Value) -> None +def test_pex_builder_basic(): + # type: () -> None with temporary_dir() as td, make_bdist("p1") as p1: - pb = write_pex(td, exe_main, dists=[p1], copy_mode=copy_mode) + pb = write_pex(td, exe_main, dists=[p1]) success_txt = os.path.join(td, "success.txt") PEX(td, interpreter=pb.interpreter).run(args=[success_txt]) @@ -177,7 +174,7 @@ def tmp_chroot(tmpdir): pytest.param(copy_mode, id=copy_mode.value) for copy_mode in CopyMode.values() # FIXME - if copy_mode != CopyMode.MEDUSA + if copy_mode != CopyMode.STITCH ], ) def test_pex_builder_add_source_relpath_issues_1192( @@ -340,7 +337,7 @@ def test_pex_builder_packed(tmpdir): pytest.param(copy_mode, id=copy_mode.value) for copy_mode in CopyMode.values() # FIXME - if copy_mode != CopyMode.MEDUSA + if copy_mode != CopyMode.STITCH ], ) @pytest.mark.parametrize(