Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-4032]Support staging binary distributions of dependency packages #16633

Merged
merged 23 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e310036
populate requirements cache wheels for linux_x86_64
AnandInguva Jan 27, 2022
29beb77
Don't return error and let the second pip install through
AnandInguva Jan 27, 2022
cd4cda2
Update sdks/python/apache_beam/options/pipeline_options.py
AnandInguva Jan 29, 2022
d0c52f6
Refactoring tests and stager.py
AnandInguva Jan 30, 2022
7a9a6f0
Merge remote-tracking branch 'origin/stage-wheels' into stage-wheels
AnandInguva Jan 30, 2022
ec99a96
Integration test with --requirements_file
AnandInguva Feb 3, 2022
b0cb013
Change message when the first pip install fails in boot.go
AnandInguva Feb 4, 2022
3fa9480
Refactor code
AnandInguva Feb 4, 2022
0dcd5fc
Add license
AnandInguva Feb 4, 2022
e9f1124
Rename: remove extra underscore in method name
AnandInguva Feb 9, 2022
0eb0674
IT to check if a package installed on worker container from requireme…
AnandInguva Feb 9, 2022
d95f2d0
resolve conflicts
AnandInguva Feb 9, 2022
6fe6d4b
Revert "resolve conflicts"
AnandInguva Feb 9, 2022
1d5f8b5
Fixup lint
AnandInguva Feb 9, 2022
af9b886
Refactor integration test.
AnandInguva Feb 9, 2022
bccc7b8
fixup: lint
AnandInguva Feb 9, 2022
3df1666
Merge remote-tracking branch 'upstream/master' into stage-wheels
AnandInguva Feb 10, 2022
b9a2751
Merge remote-tracking branch 'upstream/master' into stage-wheels
AnandInguva Feb 10, 2022
ebaaed0
Add staging wheels feature to the CHANGES.md
AnandInguva Feb 10, 2022
9f5f0ee
fixup lint
AnandInguva Feb 10, 2022
aa7af77
Refactor integration test
AnandInguva Feb 10, 2022
4d36d80
Add description to CHANGES.md
AnandInguva Feb 10, 2022
9b3a04a
Fixup lint
AnandInguva Feb 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@

## New Features / Improvements

* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).

* Pipeline dependencies supplied through `--requirements_file` will now be staged to the runner using their binary distributions (wheels) of the PyPI packages for linux_x86_64 platform ([BEAM-4032](https://issues.apache.org/jira/browse/BEAM-4032)).
To restore the behavior to use source distributions, set pipeline option `--requirements_cache_only_sources`. To skip staging the packages at submission time, set pipeline option `--requirements_cache=skip` (Python)
## Breaking Changes

* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
Expand Down
15 changes: 14 additions & 1 deletion sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,20 @@ def _add_argparse_args(cls, parser):
default=None,
help=(
'Path to a folder to cache the packages specified in '
'the requirements file using the --requirements_file option.'))
'the requirements file using the --requirements_file option.'
'If you want to skip populating requirements cache, please '
'specify --requirements_cache="skip".'))
parser.add_argument(
'--requirements_cache_only_sources',
action='store_true',
help=(
'Enable this flag to populate requirements cache only '
'with Source distributions(sdists) of the dependencies '
'mentioned in the --requirements_file'
'Note: (BEAM-4032): This flag may significantly slow down '
'the pipeline submission. It is added to preserve the requirements'
' cache behavior prior to 2.37.0 and will likely be removed in '
'future releases.'))
parser.add_argument(
'--setup_file',
default=None,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A pipeline to verify the installation of packages specified in the
requirements.txt. A requirements text is created during runtime with
package specified in _PACKAGE_IN_REQUIREMENTS_FILE.
"""

import argparse
import logging
import os
import shutil
import tempfile

import pkg_resources as pkg

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

_PACKAGE_IN_REQUIREMENTS_FILE = ['matplotlib', 'seaborn']


def verify_packages_from_requirements_file_are_installed(unused_element):
_PACKAGE_NOT_IN_REQUIREMENTS_FILE = ['torch']
packages_to_test = _PACKAGE_IN_REQUIREMENTS_FILE + (
_PACKAGE_NOT_IN_REQUIREMENTS_FILE)
for package_name in packages_to_test:
try:
output = pkg.get_distribution(package_name)
except pkg.DistributionNotFound as e: # pylint: disable=unused-variable
output = None
if package_name in _PACKAGE_IN_REQUIREMENTS_FILE:
assert output is not None, ('Please check if package %s is specified'
' in requirements file' % package_name)
if package_name in _PACKAGE_NOT_IN_REQUIREMENTS_FILE:
assert output is None


def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
_, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
temp_dir = tempfile.mkdtemp()
requirements_text_path = os.path.join(temp_dir, 'requirements.txt')
with open(requirements_text_path, 'w') as f:
f.write('\n'.join(_PACKAGE_IN_REQUIREMENTS_FILE))
pipeline_options.view_as(
SetupOptions).requirements_file = requirements_text_path

with beam.Pipeline(options=pipeline_options) as p:
( # pylint: disable=expression-not-assigned
p
| beam.Create([None])
| beam.Map(verify_packages_from_requirements_file_are_installed))
shutil.rmtree(temp_dir)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
97 changes: 77 additions & 20 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import shutil
import sys
import tempfile
from distutils.version import StrictVersion
from typing import Callable
from typing import List
from typing import Optional
from typing import Tuple
Expand Down Expand Up @@ -82,6 +84,8 @@
WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
REQUIREMENTS_FILE = 'requirements.txt'
EXTRA_PACKAGES_FILE = 'extra_packages.txt'
# One of the choices for user to use for requirements cache during staging
SKIP_REQUIREMENTS_CACHE = 'skip'

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -162,7 +166,7 @@ def create_job_resources(options, # type: PipelineOptions
temp_dir, # type: str
build_setup_args=None, # type: Optional[List[str]]
pypi_requirements=None, # type: Optional[List[str]]
populate_requirements_cache=None, # type: Optional[str]
populate_requirements_cache=None, # type: Optional[Callable[[str, str, bool], None]]
skip_prestaged_dependencies=False, # type: Optional[bool]
):
"""For internal use only; no backwards-compatibility guarantees.
Expand Down Expand Up @@ -197,6 +201,8 @@ def create_job_resources(options, # type: PipelineOptions
resources = [] # type: List[beam_runner_api_pb2.ArtifactInformation]

setup_options = options.view_as(SetupOptions)
use_beam_default_container = options.view_as(
WorkerOptions).sdk_container_image is None

pickler.set_library(setup_options.pickle_library)

Expand All @@ -205,8 +211,8 @@ def create_job_resources(options, # type: PipelineOptions
# if we know we are using a dependency pre-installed sdk container image.
if not skip_prestaged_dependencies:
requirements_cache_path = (
os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache')
if setup_options.requirements_cache is None else
os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache') if
(setup_options.requirements_cache is None) else
setup_options.requirements_cache)
if not os.path.exists(requirements_cache_path):
os.makedirs(requirements_cache_path)
Expand All @@ -223,10 +229,20 @@ def create_job_resources(options, # type: PipelineOptions
setup_options.requirements_file, REQUIREMENTS_FILE))
# Populate cache with packages from the requirement file option and
# stage the files in the cache.
(
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
setup_options.requirements_file, requirements_cache_path)
if not use_beam_default_container:
_LOGGER.warning(
'When using a custom container image, prefer installing'
' additional PyPI dependencies directly into the image,'
' instead of specifying them via runtime options, '
'such as --requirements_file. ')

if setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE:
(
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
setup_options.requirements_file,
requirements_cache_path,
setup_options.requirements_cache_only_sources)

if pypi_requirements:
tf = tempfile.NamedTemporaryFile(mode='w', delete=False)
Expand All @@ -235,12 +251,16 @@ def create_job_resources(options, # type: PipelineOptions
resources.append(Stager._create_file_pip_requirements_artifact(tf.name))
# Populate cache with packages from PyPI requirements and stage
# the files in the cache.
(
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
tf.name, requirements_cache_path)

if setup_options.requirements_file is not None or pypi_requirements:
if setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE:
(
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
tf.name,
requirements_cache_path,
setup_options.requirements_cache_only_sources)

if (setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE) and (
setup_options.requirements_file is not None or pypi_requirements):
for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
resources.append(
Stager._create_file_stage_to_artifact(pkg, os.path.basename(pkg)))
Expand Down Expand Up @@ -398,7 +418,7 @@ def create_and_stage_job_resources(
build_setup_args=None, # type: Optional[List[str]]
temp_dir=None, # type: Optional[str]
pypi_requirements=None, # type: Optional[List[str]]
populate_requirements_cache=None, # type: Optional[str]
populate_requirements_cache=None, # type: Optional[Callable[[str, str, bool], None]]
staging_location=None # type: Optional[str]
):
"""For internal use only; no backwards-compatibility guarantees.
Expand Down Expand Up @@ -644,7 +664,7 @@ def _get_python_executable():
return python_bin

@staticmethod
def remove_dependency_from_requirements(
def _remove_dependency_from_requirements(
requirements_file, # type: str
dependency_to_remove, # type: str
temp_directory_path):
Expand All @@ -663,10 +683,31 @@ def remove_dependency_from_requirements(

return tmp_requirements_filename

@staticmethod
def _get_platform_for_default_sdk_container():
"""
Get the platform for apache beam SDK container based on Pip version.

Note: pip is still expected to download compatible wheel of a package
with platform tag manylinux1 if the package on PyPI doesn't
have (manylinux2014) or (manylinux2010) wheels.
Reference: https://www.python.org/dev/peps/pep-0599/#id21
"""

# TODO(anandinguva): When https://github.com/pypa/pip/issues/10760 is
AnandInguva marked this conversation as resolved.
Show resolved Hide resolved
# addressed, download wheel based on glibc version in Beam's Python
# Base image
pip_version = pkg_resources.get_distribution('pip').version
if StrictVersion(pip_version) >= StrictVersion('19.3'):
return 'manylinux2014_x86_64'
else:
return 'manylinux2010_x86_64'

@staticmethod
@retry.with_exponential_backoff(
num_retries=4, retry_filter=retry_on_non_zero_exit)
def _populate_requirements_cache(requirements_file, cache_dir):
def _populate_requirements_cache(
requirements_file, cache_dir, populate_cache_with_sdists=False):
# The 'pip download' command will not download again if it finds the
# tarball with the proper version already present.
# It will get the packages downloaded in the order they are presented in
Expand All @@ -675,10 +716,11 @@ def _populate_requirements_cache(requirements_file, cache_dir):
# The apache-beam dependency is excluded from requirements cache population
# because we stage the SDK separately.
with tempfile.TemporaryDirectory() as temp_directory:
tmp_requirements_filepath = Stager.remove_dependency_from_requirements(
tmp_requirements_filepath = Stager._remove_dependency_from_requirements(
requirements_file=requirements_file,
dependency_to_remove='apache-beam',
temp_directory_path=temp_directory)

cmd_args = [
Stager._get_python_executable(),
'-m',
Expand All @@ -690,10 +732,25 @@ def _populate_requirements_cache(requirements_file, cache_dir):
tmp_requirements_filepath,
'--exists-action',
'i',
# Download from PyPI source distributions.
'--no-binary',
':all:'
'--no-deps'
]

if populate_cache_with_sdists:
cmd_args.extend(['--no-binary', ':all:'])
else:
language_implementation_tag = 'cp'
abi_suffix = 'm' if sys.version_info < (3, 8) else ''
abi_tag = 'cp%d%d%s' % (
sys.version_info[0], sys.version_info[1], abi_suffix)
platform_tag = Stager._get_platform_for_default_sdk_container()
cmd_args.extend([
'--implementation',
language_implementation_tag,
'--abi',
abi_tag,
'--platform',
platform_tag
])
_LOGGER.info('Executing command: %s', cmd_args)
processes.check_output(cmd_args, stderr=processes.STDOUT)

Expand Down
Loading