Skip to content

Commit

Permalink
populate requirements cache wheels for linux_x86_64
Browse files Browse the repository at this point in the history
  • Loading branch information
AnandInguva committed Jan 27, 2022
1 parent fd73461 commit e310036
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 24 deletions.
16 changes: 15 additions & 1 deletion sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,21 @@ def _add_argparse_args(cls, parser):
default=None,
help=(
'Path to a folder to cache the packages specified in '
'the requirements file using the --requirements_file option.'))
'the requirements file using the --requirements_file option.'
'If you want to skip populating requirements cache, please '
'specify --requirements_cache skip. This would install all'
'the packages from requirements file on the worker.'))
parser.add_argument(
'--requirements_cache_only_sources',
action='store_true',
help=(
'Enable this flag to populate requirements cache with Source'
'distributions(sdists) of the dependencies mentioned in the '
'--requirements_file'
'Note: This step would slow down the worker startup time'
'By default, the requirements cache will be populated with '
'binary distribution(bdist)/whl, assuming the platform on the '
'--sdk_container_image(if provided) is _x86_64'))
parser.add_argument(
'--setup_file',
default=None,
Expand Down
87 changes: 67 additions & 20 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import shutil
import sys
import tempfile
from typing import Callable
from typing import List
from typing import Optional
from typing import Tuple
Expand Down Expand Up @@ -82,6 +83,8 @@
WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
REQUIREMENTS_FILE = 'requirements.txt'
EXTRA_PACKAGES_FILE = 'extra_packages.txt'
# One of the choices for user to use for requirements cache during staging
SKIP_REQUIREMENTS_CACHE = 'skip'

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -162,7 +165,7 @@ def create_job_resources(options, # type: PipelineOptions
temp_dir, # type: str
build_setup_args=None, # type: Optional[List[str]]
pypi_requirements=None, # type: Optional[List[str]]
populate_requirements_cache=None, # type: Optional[str]
populate_requirements_cache=None, # type: Optional[Callable[[str, str, bool], None]]
skip_prestaged_dependencies=False, # type: Optional[bool]
):
"""For internal use only; no backwards-compatibility guarantees.
Expand Down Expand Up @@ -197,6 +200,8 @@ def create_job_resources(options, # type: PipelineOptions
resources = [] # type: List[beam_runner_api_pb2.ArtifactInformation]

setup_options = options.view_as(SetupOptions)
use_beam_default_container = options.view_as(
WorkerOptions).sdk_container_image is None

pickler.set_library(setup_options.pickle_library)

Expand All @@ -205,8 +210,9 @@ def create_job_resources(options, # type: PipelineOptions
# if we know we are using a dependency pre-installed sdk container image.
if not skip_prestaged_dependencies:
requirements_cache_path = (
os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache')
if setup_options.requirements_cache is None else
os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache') if
(setup_options.requirements_cache is None) and
(setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE) else
setup_options.requirements_cache)
if not os.path.exists(requirements_cache_path):
os.makedirs(requirements_cache_path)
Expand All @@ -223,10 +229,20 @@ def create_job_resources(options, # type: PipelineOptions
setup_options.requirements_file, REQUIREMENTS_FILE))
# Populate cache with packages from the requirement file option and
# stage the files in the cache.
(
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
setup_options.requirements_file, requirements_cache_path)
if not use_beam_default_container:
_LOGGER.warning(
'When using a custom container image, prefer installing'
' additional PyPI dependencies directly into the image,'
' instead of specifying them via runtime options, '
'such as --requirements_file. ')

if setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE:
(
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
setup_options.requirements_file,
requirements_cache_path,
setup_options.requirements_cache_only_sources)

if pypi_requirements:
tf = tempfile.NamedTemporaryFile(mode='w', delete=False)
Expand All @@ -235,12 +251,16 @@ def create_job_resources(options, # type: PipelineOptions
resources.append(Stager._create_file_pip_requirements_artifact(tf.name))
# Populate cache with packages from PyPI requirements and stage
# the files in the cache.
(
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
tf.name, requirements_cache_path)

if setup_options.requirements_file is not None or pypi_requirements:
if setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE:
(
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
tf.name,
requirements_cache_path,
setup_options.requirements_cache_only_sources)

if (setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE) and (
setup_options.requirements_file is not None or pypi_requirements):
for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
resources.append(
Stager._create_file_stage_to_artifact(pkg, os.path.basename(pkg)))
Expand Down Expand Up @@ -398,7 +418,7 @@ def create_and_stage_job_resources(
build_setup_args=None, # type: Optional[List[str]]
temp_dir=None, # type: Optional[str]
pypi_requirements=None, # type: Optional[List[str]]
populate_requirements_cache=None, # type: Optional[str]
populate_requirements_cache=None, # type: Optional[Callable[[str, str, bool], None]]
staging_location=None # type: Optional[str]
):
"""For internal use only; no backwards-compatibility guarantees.
Expand Down Expand Up @@ -644,7 +664,7 @@ def _get_python_executable():
return python_bin

@staticmethod
def remove_dependency_from_requirements(
def _remove_dependency_from_requirements(
requirements_file, # type: str
dependency_to_remove, # type: str
temp_directory_path):
Expand All @@ -663,10 +683,21 @@ def remove_dependency_from_requirements(

return tmp_requirements_filename

@staticmethod
def _get_manylinux_distribution():
# TODO(anandinguva): When https://github.com/pypa/pip/issues/10760 is
# addressed download wheel based on glib version in Beam's Python Base image
pip_version = pkg_resources.get_distribution('pip').version
if float(pip_version[0:4]) >= 19.3:
return 'manylinux2014_x86_64'
else:
return 'manylinux2010_x86_64'

@staticmethod
@retry.with_exponential_backoff(
num_retries=4, retry_filter=retry_on_non_zero_exit)
def _populate_requirements_cache(requirements_file, cache_dir):
def _populate_requirements_cache(
requirements_file, cache_dir, populate_cache_with_sdists=False):
# The 'pip download' command will not download again if it finds the
# tarball with the proper version already present.
# It will get the packages downloaded in the order they are presented in
Expand All @@ -675,10 +706,11 @@ def _populate_requirements_cache(requirements_file, cache_dir):
# The apache-beam dependency is excluded from requirements cache population
# because we stage the SDK separately.
with tempfile.TemporaryDirectory() as temp_directory:
tmp_requirements_filepath = Stager.remove_dependency_from_requirements(
tmp_requirements_filepath = Stager._remove_dependency_from_requirements(
requirements_file=requirements_file,
dependency_to_remove='apache-beam',
temp_directory_path=temp_directory)

cmd_args = [
Stager._get_python_executable(),
'-m',
Expand All @@ -690,10 +722,25 @@ def _populate_requirements_cache(requirements_file, cache_dir):
tmp_requirements_filepath,
'--exists-action',
'i',
# Download from PyPI source distributions.
'--no-binary',
':all:'
'--no-deps'
]

if populate_cache_with_sdists:
cmd_args.extend(['--no-binary', ':all:'])
else:
language_implementation_tag = 'cp'
abi_suffix = 'm' if sys.version_info < (3, 8) else ''
abi_tag = 'cp%d%d%s' % (
sys.version_info[0], sys.version_info[1], abi_suffix)
platform_tag = Stager._get_manylinux_distribution()
cmd_args.extend([
'--implementation',
language_implementation_tag,
'--abi',
abi_tag,
'--platform',
platform_tag
])
_LOGGER.info('Executing command: %s', cmd_args)
processes.check_output(cmd_args, stderr=processes.STDOUT)

Expand Down
107 changes: 104 additions & 3 deletions sdks/python/apache_beam/runners/portability/stager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ def file_copy(self, from_path, to_path):
else:
shutil.copyfile(from_path, to_path)

def populate_requirements_cache(self, requirements_file, cache_dir):
def populate_requirements_cache(
self, requirements_file, cache_dir, populate_cache_with_sdists=False):
_ = requirements_file
self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing')
self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing')
Expand Down Expand Up @@ -342,6 +343,47 @@ def test_with_requirements_file_and_cache(self):
self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt')))

def test_with_requirements_file_skipping_cache(self):
staging_dir = self.make_temp_dir()
source_dir = self.make_temp_dir()

options = PipelineOptions()
self.update_options(options)
options.view_as(SetupOptions).requirements_file = os.path.join(
source_dir, stager.REQUIREMENTS_FILE)
options.view_as(
SetupOptions).requirements_cache = stager.SKIP_REQUIREMENTS_CACHE
self.create_temp_file(
os.path.join(source_dir, stager.REQUIREMENTS_FILE), 'nothing')

resources = self.stager.create_and_stage_job_resources(
options,
populate_requirements_cache=self.populate_requirements_cache,
staging_location=staging_dir)[1]

self.assertEqual([stager.REQUIREMENTS_FILE], resources)
self.assertTrue(not os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
self.assertTrue(not os.path.isfile(os.path.join(staging_dir, 'def.txt')))

def test_with_pypi_requirements_skipping_cache(self):
staging_dir = self.make_temp_dir()

options = PipelineOptions()
self.update_options(options)
options.view_as(
SetupOptions).requirements_cache = stager.SKIP_REQUIREMENTS_CACHE

resources = self.stager.create_and_stage_job_resources(
options,
pypi_requirements=['nothing>=1.0,<2.0'],
populate_requirements_cache=self.populate_requirements_cache,
staging_location=staging_dir)[1]
with open(os.path.join(staging_dir, resources[0])) as f:
data = f.read()
self.assertEqual('nothing>=1.0,<2.0', data)
self.assertTrue(not os.path.isfile(os.path.join(staging_dir, 'abc.txt')))
self.assertTrue(not os.path.isfile(os.path.join(staging_dir, 'def.txt')))

def test_setup_file_not_present(self):
staging_dir = self.make_temp_dir()

Expand Down Expand Up @@ -705,15 +747,15 @@ def test_remove_dependency_from_requirements(self):
for i in range(len(requirements)):
f.write(requirements[i])

tmp_req_filename = self.stager.remove_dependency_from_requirements(
tmp_req_filename = self.stager._remove_dependency_from_requirements(
requirements_file=os.path.join(requirements_cache_dir, 'abc.txt'),
dependency_to_remove='apache_beam',
temp_directory_path=requirements_cache_dir)
with open(tmp_req_filename, 'r') as tf:
lines = tf.readlines()
self.assertEqual(['avro-python3\n', 'fastavro\n', 'numpy\n'], sorted(lines))

tmp_req_filename = self.stager.remove_dependency_from_requirements(
tmp_req_filename = self.stager._remove_dependency_from_requirements(
requirements_file=os.path.join(requirements_cache_dir, 'abc.txt'),
dependency_to_remove='fastavro',
temp_directory_path=requirements_cache_dir)
Expand All @@ -723,6 +765,65 @@ def test_remove_dependency_from_requirements(self):
self.assertEqual(['apache_beam\n', 'avro-python3\n', 'numpy\n'],
sorted(lines))

def _create_file(
self, requirements_file, temp_dir, populate_cache_with_sdists):
if not populate_cache_with_sdists:
self.create_temp_file(os.path.join(temp_dir, 'nothing.whl'), 'Fake whl')
self.create_temp_file(
os.path.join(temp_dir, 'nothing.tar.gz'), 'Fake tarball')
else:
self.create_temp_file(
os.path.join(temp_dir, 'nothing.tar.gz'), 'Fake tarball')

def test_populate_requirements_cache_with_sdist(self):
staging_dir = self.make_temp_dir()
requirements_cache_dir = self.make_temp_dir()
source_dir = self.make_temp_dir()

options = PipelineOptions()
self.update_options(options)

options.view_as(SetupOptions).requirements_cache = requirements_cache_dir
options.view_as(SetupOptions).requirements_file = os.path.join(
source_dir, stager.REQUIREMENTS_FILE)
self.create_temp_file(
os.path.join(source_dir, stager.REQUIREMENTS_FILE), 'nothing')
# for default container image, the sdk_container_image option would be none
with mock.patch('apache_beam.runners.portability.stager_test'
'.stager.Stager._populate_requirements_cache',
staticmethod(self._create_file)):
options.view_as(SetupOptions).requirements_cache_only_sources = False
resources = self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1]
for f in resources:
if f != stager.REQUIREMENTS_FILE:
self.assertTrue(('.tar.gz' in f) or ('.whl' in f))

def test_populate_requirements_cache_with_bdist(self):
staging_dir = self.make_temp_dir()
requirements_cache_dir = self.make_temp_dir()
source_dir = self.make_temp_dir()

options = PipelineOptions()
self.update_options(options)

options.view_as(SetupOptions).requirements_cache = requirements_cache_dir
options.view_as(SetupOptions).requirements_file = os.path.join(
source_dir, stager.REQUIREMENTS_FILE)
self.create_temp_file(
os.path.join(source_dir, stager.REQUIREMENTS_FILE), 'nothing')
with mock.patch('apache_beam.runners.portability.stager_test'
'.stager.Stager._populate_requirements_cache',
staticmethod(self._create_file)):
options.view_as(SetupOptions).requirements_cache_only_sources = True
resources = self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1]

for f in resources:
if f != stager.REQUIREMENTS_FILE:
self.assertTrue('.tar.gz' in f)
self.assertTrue('.whl' not in f)


class TestStager(stager.Stager):
def stage_artifact(self, local_path_to_artifact, artifact_name, sha256):
Expand Down

0 comments on commit e310036

Please sign in to comment.