Skip to content

Commit

Permalink
Unify PEX buildtime and runtime wheel caches. (#821)
Browse files Browse the repository at this point in the history
Previously these caches were seperate. Downloaded wheels and sdists were
cached in `~/.pex/build` and wheels unzipped from zipped pexes at
runtime were cached to `~/.pex/install`.

Now the caches are unified by the resolver such that any wheel installs
performed by it can be seen by zipped PEXes on the same machine when
they go to potentially unzip wheel distributions stored within at PEX
boot time.

N.B.: The cache is not unified in the other direction. If a zipped PEX
is executed on the same machine a PEX build resolve later happens on,
any intersecting wheels will be re-downloaded, built and installed by
the resolve finally unifying the caches from that point forward.

Fixes #820

Also add a test demonstrating "ipex".

The test demonstrates how to create a dehydrated pex that, upon first
execution, produces a hydrated pex that it hands control to from then
forward. This is the motivating use case for the cache unification
change which prevents dehydrated pexes from performing wheel unzipping
twice - once during pex hydration (resolve) and once during hydrated pex
run.
  • Loading branch information
jsirois authored Dec 6, 2019
1 parent f796e00 commit 66aa5ef
Show file tree
Hide file tree
Showing 13 changed files with 488 additions and 208 deletions.
19 changes: 8 additions & 11 deletions pex/bin/pex.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ def configure_clp_pex_resolution(parser):
group.add_option(
'--cache-dir',
dest='cache_dir',
default='{pex_root}/build',
default='{pex_root}',
help='The local cache directory to use for speeding up requirement '
'lookups. [Default: ~/.pex/build]')
'lookups. [Default: ~/.pex]')

group.add_option(
'--wheel', '--no-wheel', '--no-use-wheel',
Expand Down Expand Up @@ -458,7 +458,7 @@ def configure_clp():
parser.add_option(
'--pex-root',
dest='pex_root',
default=None,
default=ENV.PEX_ROOT,
help='Specify the pex root used in this invocation of pex. [Default: ~/.pex]'
)

Expand Down Expand Up @@ -625,16 +625,13 @@ def main(args=None):
if options.python and options.interpreter_constraint:
die('The "--python" and "--interpreter-constraint" options cannot be used together.')

if options.pex_root:
ENV.set('PEX_ROOT', options.pex_root)
else:
options.pex_root = ENV.PEX_ROOT # If option not specified fallback to env variable.
with ENV.patch(PEX_VERBOSE=str(options.verbosity),
PEX_ROOT=options.pex_root) as patched_env:

# Don't alter cache if it is disabled.
if options.cache_dir:
options.cache_dir = make_relative_to_root(options.cache_dir)
# Don't alter cache if it is disabled.
if options.cache_dir:
options.cache_dir = make_relative_to_root(options.cache_dir)

with ENV.patch(PEX_VERBOSE=str(options.verbosity)) as patched_env:
with TRACER.timed('Building pex'):
pex_builder = build_pex(reqs, options)

Expand Down
86 changes: 76 additions & 10 deletions pex/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import time
import zipfile
from collections import defaultdict
from contextlib import contextmanager
from datetime import datetime
from uuid import uuid4

Expand Down Expand Up @@ -217,19 +218,84 @@ def safe_sleep(seconds):
current_time = time.time()


def rename_if_empty(src, dest, allowable_errors=(errno.EEXIST, errno.ENOTEMPTY)):
"""Rename `src` to `dest` using `os.rename()`.
class AtomicDirectory(object):
def __init__(self, target_dir):
self._target_dir = target_dir
self._work_dir = '{}.{}'.format(target_dir, uuid4().hex)

If an `OSError` with errno in `allowable_errors` is encountered during the rename, the `dest`
dir is left unchanged and the `src` directory will simply be removed.
@property
def work_dir(self):
return self._work_dir

@property
def target_dir(self):
return self._target_dir

@property
def is_finalized(self):
return os.path.exists(self._target_dir)

def finalize(self, source=None):
"""Rename `work_dir` to `target_dir` using `os.rename()`.
:param str source: An optional source offset into the `work_dir`` to use for the atomic
update of `target_dir`. By default the whole `work_dir` is used.
If a race is lost and `target_dir` already exists, the `target_dir` dir is left unchanged and
the `work_dir` directory will simply be removed.
"""
if self.is_finalized:
return

source = os.path.join(self._work_dir, source) if source else self._work_dir
try:
# Perform an atomic rename.
#
# Per the docs: https://docs.python.org/2.7/library/os.html#os.rename
#
# The operation may fail on some Unix flavors if src and dst are on different filesystems.
# If successful, the renaming will be an atomic operation (this is a POSIX requirement).
#
# We have satisfied the single filesystem constraint by arranging the `work_dir` to be a
# sibling of the `target_dir`.
os.rename(source, self._target_dir)
except OSError as e:
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
raise e
finally:
self.cleanup()

def cleanup(self):
safe_rmtree(self._work_dir)


@contextmanager
def atomic_directory(target_dir, source=None):
"""A context manager that yields a new empty work directory path it will move to `target_dir`.
:param str target_dir: The target directory to atomically update.
:param str source: An optional source offset into the work directory to use for the atomic update
of the target directory. By default the whole work directory is used.
If the `target_dir` already exists the enclosed block will be yielded `None` to signal there is
no work to do.
If the enclosed block fails the `target_dir` will be undisturbed.
The new work directory will be cleaned up regardless of whether or not the enclosed block
succeeds.
"""
atomic_dir = AtomicDirectory(target_dir=target_dir)
if atomic_dir.is_finalized:
yield None
return

safe_mkdir(atomic_dir.work_dir)
try:
os.rename(src, dest)
except OSError as e:
if e.errno in allowable_errors:
safe_rmtree(src)
else:
raise
yield atomic_dir.work_dir
atomic_dir.finalize(source=source)
finally:
atomic_dir.cleanup()


def chmod_plus_x(path):
Expand Down
62 changes: 17 additions & 45 deletions pex/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,14 @@
import os
import site
import sys
import uuid
import zipfile
from collections import OrderedDict

from pex import pex_builder, pex_warnings
from pex.bootstrap import Bootstrap
from pex.common import die, open_zip, rename_if_empty, safe_mkdir, safe_rmtree
from pex.common import atomic_directory, die, open_zip
from pex.interpreter import PythonInterpreter
from pex.package import distribution_compatible
from pex.pex_info import PexInfo
from pex.platforms import Platform
from pex.third_party.pkg_resources import (
DistributionNotFound,
Expand All @@ -27,7 +25,7 @@
find_distributions
)
from pex.tracer import TRACER
from pex.util import CacheHelper, DistributionHelper
from pex.util import CacheHelper


def _import_pkg_resources():
Expand Down Expand Up @@ -111,21 +109,14 @@ def _force_local(cls, pex_file, pex_info):
return pex_file
explode_dir = os.path.join(pex_info.zip_unsafe_cache, pex_info.code_hash)
TRACER.log('PEX is not zip safe, exploding to %s' % explode_dir)
if not os.path.exists(explode_dir):
explode_tmp = explode_dir + '.' + uuid.uuid4().hex
with TRACER.timed('Unzipping %s' % pex_file):
try:
safe_mkdir(explode_tmp)
with atomic_directory(explode_dir) as explode_tmp:
if explode_tmp:
with TRACER.timed('Unzipping %s' % pex_file):
with open_zip(pex_file) as pex_zip:
pex_files = (x for x in pex_zip.namelist()
if not x.startswith(pex_builder.BOOTSTRAP_DIR) and
not x.startswith(PexInfo.INTERNAL_CACHE))
not x.startswith(pex_info.internal_cache))
pex_zip.extractall(explode_tmp, pex_files)
except: # noqa: T803
safe_rmtree(explode_tmp)
raise
TRACER.log('Renaming %s to %s' % (explode_tmp, explode_dir))
rename_if_empty(explode_tmp, explode_dir)
return explode_dir

@classmethod
Expand Down Expand Up @@ -162,34 +153,14 @@ def _update_module_paths(cls, pex_file):
reimported_module.__path__.append(path_item)

@classmethod
def _write_zipped_internal_cache(cls, pex, pex_info):
prefix_length = len(pex_info.internal_cache) + 1
existing_cached_distributions = []
newly_cached_distributions = []
with open_zip(pex) as zf:
# Distribution names are the first element after ".deps/" and before the next "/"
distribution_names = set(filter(None, (filename[prefix_length:].split('/')[0]
for filename in zf.namelist() if filename.startswith(pex_info.internal_cache))))
# Create Distribution objects from these, and possibly write to disk if necessary.
for distribution_name in distribution_names:
internal_dist_path = '/'.join([pex_info.internal_cache, distribution_name])
# First check if this is already cached
dist_digest = pex_info.distributions.get(distribution_name) or CacheHelper.zip_hash(
zf, internal_dist_path)
cached_location = os.path.join(pex_info.install_cache, '%s.%s' % (
distribution_name, dist_digest))
if os.path.exists(cached_location):
dist = DistributionHelper.distribution_from_path(cached_location)
if dist is not None:
existing_cached_distributions.append(dist)
continue

dist = DistributionHelper.distribution_from_path(os.path.join(pex, internal_dist_path))
with TRACER.timed('Caching %s' % dist):
newly_cached_distributions.append(
CacheHelper.cache_distribution(zf, internal_dist_path, cached_location))

return existing_cached_distributions, newly_cached_distributions
def _write_zipped_internal_cache(cls, zf, pex_info):
cached_distributions = []
for distribution_name, dist_digest in pex_info.distributions.items():
internal_dist_path = '/'.join([pex_info.internal_cache, distribution_name])
cached_location = os.path.join(pex_info.install_cache, dist_digest, distribution_name)
dist = CacheHelper.cache_distribution(zf, internal_dist_path, cached_location)
cached_distributions.append(dist)
return cached_distributions

@classmethod
def _load_internal_cache(cls, pex, pex_info):
Expand All @@ -200,8 +171,9 @@ def _load_internal_cache(cls, pex, pex_info):
for dist in find_distributions(internal_cache):
yield dist
else:
for dist in itertools.chain(*cls._write_zipped_internal_cache(pex, pex_info)):
yield dist
with open_zip(pex) as zf:
for dist in cls._write_zipped_internal_cache(zf, pex_info):
yield dist

def __init__(self, pex, pex_info, interpreter=None, **kw):
self._internal_cache = os.path.join(pex, pex_info.internal_cache)
Expand Down
49 changes: 12 additions & 37 deletions pex/pex_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
import os

from pex.common import Chroot, chmod_plus_x, open_zip, safe_mkdir, safe_mkdtemp
from pex.common import Chroot, chmod_plus_x, safe_mkdir, safe_mkdtemp, temporary_dir
from pex.compatibility import to_bytes
from pex.compiler import Compiler
from pex.distribution_target import DistributionTarget
Expand Down Expand Up @@ -276,37 +276,14 @@ def _add_dist_dir(self, path, dist_name):
self._copy_or_link(filename, target)
return CacheHelper.dir_hash(path)

def _add_dist_zip(self, path, dist_name):
# We need to distinguish between wheels and other zips. Most of the time,
# when we have a zip, it contains its contents in an importable form.
# But wheels don't have to be importable, so we need to force them
# into an importable shape. We can do that by installing it into its own
# wheel dir.
if dist_name.endswith("whl"):
tmp = safe_mkdtemp()
whltmp = os.path.join(tmp, dist_name)
os.mkdir(whltmp)
install_job = spawn_install_wheel(
def _add_dist_wheel_file(self, path, dist_name):
with temporary_dir() as install_dir:
spawn_install_wheel(
wheel=path,
install_dir=whltmp,
install_dir=install_dir,
target=DistributionTarget.for_interpreter(self.interpreter)
)
install_job.wait()
for root, _, files in os.walk(whltmp):
pruned_dir = os.path.relpath(root, tmp)
for f in files:
fullpath = os.path.join(root, f)
target = os.path.join(self._pex_info.internal_cache, pruned_dir, f)
self._copy_or_link(fullpath, target)
return CacheHelper.dir_hash(whltmp)

with open_zip(path) as zf:
for name in zf.namelist():
if name.endswith('/'):
continue
target = os.path.join(self._pex_info.internal_cache, dist_name, name)
self._chroot.write(zf.read(name), target)
return CacheHelper.zip_hash(zf)
).wait()
return self._add_dist_dir(install_dir, dist_name)

def _prepare_code_hash(self):
self._pex_info.code_hash = CacheHelper.pex_hash(self._chroot.path())
Expand All @@ -325,8 +302,11 @@ def add_distribution(self, dist, dist_name=None):

if os.path.isdir(dist.location):
dist_hash = self._add_dist_dir(dist.location, dist_name)
elif dist.location.endswith('.whl'):
dist_hash = self._add_dist_wheel_file(dist.location, dist_name)
else:
dist_hash = self._add_dist_zip(dist.location, dist_name)
raise self.InvalidDistribution('Unsupported distribution type: {}, pex can only accept dist '
'dirs and wheels.'.format(dist))

# add dependency key so that it can rapidly be retrieved from cache
self._pex_info.add_distribution(dist_name, dist_hash)
Expand All @@ -347,14 +327,9 @@ def add_dist_location(self, dist, name=None):
bdist = DistributionHelper.distribution_from_path(dist)
if bdist is None:
raise self.InvalidDistribution('Could not find distribution at %s' % dist)
self.add_distribution(bdist)
self.add_distribution(bdist, dist_name=name)
self.add_requirement(bdist.as_requirement())

def add_egg(self, egg):
"""Alias for add_dist_location."""
self._ensure_unfrozen('Adding an egg')
return self.add_dist_location(egg)

def _precompile_source(self):
source_relpaths = [path for label in ('source', 'executable', 'main', 'bootstrap')
for path in self._chroot.filesets.get(label, ()) if path.endswith('.py')]
Expand Down
6 changes: 3 additions & 3 deletions pex/pex_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class PexInfo(object):
"""

PATH = 'PEX-INFO'
INTERNAL_CACHE = '.deps'
INSTALL_CACHE = 'installed_wheels'

@classmethod
def make_build_properties(cls, interpreter=None):
Expand Down Expand Up @@ -290,11 +290,11 @@ def pex_root(self, value):

@property
def internal_cache(self):
return self.INTERNAL_CACHE
return '.deps'

@property
def install_cache(self):
return os.path.join(self.pex_root, 'install')
return os.path.join(self.pex_root, self.INSTALL_CACHE)

@property
def zip_unsafe_cache(self):
Expand Down
Loading

0 comments on commit 66aa5ef

Please sign in to comment.