Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-4032]Download whl for a known target platform on the container image else download sources #16448

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 139 additions & 42 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import shutil
import sys
import tempfile
from typing import Callable
from typing import List
from typing import Optional
from typing import Tuple
Expand Down Expand Up @@ -162,7 +163,7 @@ def create_job_resources(options, # type: PipelineOptions
temp_dir, # type: str
build_setup_args=None, # type: Optional[List[str]]
pypi_requirements=None, # type: Optional[List[str]]
populate_requirements_cache=None, # type: Optional[str]
populate_requirements_cache=None, # type: Optional[Callable]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Let's include argument types. I think it's:

Suggested change
populate_requirements_cache=None, # type: Optional[Callable]
populate_requirements_cache=None, # type: Optional[Callable[[str, str], None]]

skip_prestaged_dependencies=False, # type: Optional[bool]
):
"""For internal use only; no backwards-compatibility guarantees.
Expand Down Expand Up @@ -197,6 +198,9 @@ def create_job_resources(options, # type: PipelineOptions
resources = [] # type: List[beam_runner_api_pb2.ArtifactInformation]

setup_options = options.view_as(SetupOptions)
# True when sdk_container_image is apache beam image
fetch_binary = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code might be a little more self-explanatory with:

Suggested change
fetch_binary = (
has_default_container_image = (

then, we can use below: fetch_binary=has_default_container_image when passing into a function.

setup_options.view_as(WorkerOptions).sdk_container_image is None)

# We can skip boot dependencies: apache beam sdk, python packages from
# requirements.txt, python packages from extra_packages and workflow tarball
Expand All @@ -221,10 +225,16 @@ def create_job_resources(options, # type: PipelineOptions
setup_options.requirements_file, REQUIREMENTS_FILE))
# Populate cache with packages from the requirement file option and
# stage the files in the cache.
if not fetch_binary: # display warning.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the inline comment is not necessary, the comment is self-explanatory in this case. Btw, https://stackoverflow.blog/2021/12/23/best-practices-for-writing-code-comments/ is a good read. Rule 1 applies here.

_LOGGER.warning(
'Avoid using requirements.txt when using a '
'custom container image.')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wording suggestion for the message:

When using a custom container image, prefer installing additional PyPI dependencies directly into the image, instead of specifying them via runtime options, such as --requirements_file.

(
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
setup_options.requirements_file, requirements_cache_path)
setup_options.requirements_file,
requirements_cache_path,
fetch_binary=fetch_binary)

if pypi_requirements:
tf = tempfile.NamedTemporaryFile(mode='w', delete=False)
Expand All @@ -236,7 +246,7 @@ def create_job_resources(options, # type: PipelineOptions
(
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
tf.name, requirements_cache_path)
tf.name, requirements_cache_path, fetch_binary=fetch_binary)

if setup_options.requirements_file is not None or pypi_requirements:
for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
Expand Down Expand Up @@ -352,7 +362,6 @@ def create_job_resources(options, # type: PipelineOptions
resources.append(
Stager._create_file_stage_to_artifact(
dataflow_worker_jar, jar_staged_filename))

return resources

def stage_job_resources(self,
Expand Down Expand Up @@ -394,7 +403,7 @@ def create_and_stage_job_resources(
build_setup_args=None, # type: Optional[List[str]]
temp_dir=None, # type: Optional[str]
pypi_requirements=None, # type: Optional[List[str]]
populate_requirements_cache=None, # type: Optional[str]
populate_requirements_cache=None, # type: Optional[Callable]
staging_location=None # type: Optional[str]
):
"""For internal use only; no backwards-compatibility guarantees.
Expand Down Expand Up @@ -640,12 +649,13 @@ def _get_python_executable():
return python_bin

@staticmethod
def remove_dependency_from_requirements(
def _remove_dependency_from_requirements(
requirements_file, # type: str
dependency_to_remove, # type: str
temp_directory_path):
"""Function to remove dependencies from a given requirements file."""
# read all the dependency names
requirements_to_install = []
with open(requirements_file, 'r') as f:
lines = f.readlines()

Expand All @@ -656,25 +666,41 @@ def remove_dependency_from_requirements(
for i in range(len(lines)):
if not lines[i].startswith(dependency_to_remove):
tf.write(lines[i])
requirements_to_install.append(lines[i].strip())

return tmp_requirements_filename
return tmp_requirements_filename, requirements_to_install

@staticmethod
@retry.with_exponential_backoff(
num_retries=4, retry_filter=retry_on_non_zero_exit)
def _populate_requirements_cache(requirements_file, cache_dir):
# The 'pip download' command will not download again if it finds the
# tarball with the proper version already present.
# It will get the packages downloaded in the order they are presented in
# the requirements file and will download package dependencies.

# The apache-beam dependency is excluded from requirements cache population
# because we stage the SDK separately.
def _populate_requirements_cache(requirements_file, # type: str
cache_dir, # type: str
fetch_binary=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reduce code complexity, I think we can omit fetch_binary param from this flag. Currently, the only case when fetch_binary=False branch is used is custom container users who pass the requirements file. I think it's ok to change the behavior for these users as well, so that we download sources one-by-one from the requirements.txt file in this case, as opposed to download sources from the entire requirements file

"""
The 'pip download' command will not download again if it finds
the tarball with the proper version already present.
It will get the packages downloaded in the order they are presented in
the requirements file and will download package dependencies
to the cache_dir.

Args:
requirements_file: A file with PyPI dependencies.
cache_dir: Cache to download deps
fetch_binary: Download whls if this flag is enabled.
Supported for 'linux_x86_64' architecture.
"""

# The apache-beam dependency is excluded from requirements cache population
# because we stage the SDK separately.
with tempfile.TemporaryDirectory() as temp_directory:
tmp_requirements_filepath = Stager.remove_dependency_from_requirements(
tmp_requirements_filepath, requirements_to_install = (
Stager._remove_dependency_from_requirements(
requirements_file=requirements_file,
dependency_to_remove='apache-beam',
temp_directory_path=temp_directory)
)
if not fetch_binary:
# download only sources. if no source found, this fails.
cmd_args = [
Stager._get_python_executable(),
'-m',
Expand All @@ -692,6 +718,46 @@ def _populate_requirements_cache(requirements_file, cache_dir):
]
_LOGGER.info('Executing command: %s', cmd_args)
processes.check_output(cmd_args, stderr=processes.STDOUT)
else: # try to download wheels
language_implementation_tag = 'cp'
language_version_tag = '%d%d' % (
sys.version_info[0], sys.version_info[1]) # Python version
abi_suffix = 'm' if sys.version_info < (
3, 8) else '' # ABI suffix to use for the whl
abi_tag = 'cp%d%d%s' % (
sys.version_info[0], sys.version_info[1], abi_suffix
) # ABI tag to use
# install each package individually
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add explanation why installing each package individually is necessary in this case.

for requirement in requirements_to_install:
try:
# download the bdist compatible with the debian platform
# TODO(anandinguva): the platform tag will get updated. Check PEP 600
Copy link
Contributor

@tvalentyn tvalentyn Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From offline discussion:

  • We can rewrite TODO to make it more actionable (do X why Y is fixed)
  • _download_pypi_package currently does not do the download, but builds the command line
  • We can likely use the same method for _download_pypi_package and _download_pypi_sdk_package if the download params are the same.

cmd_args = Stager._download_pypi_package(
package_name=requirement,
temp_dir=cache_dir,
fetch_binary=True,
language_version_tag=language_version_tag,
language_implementation_tag=language_implementation_tag,
abi_tag=abi_tag,
other_flags=[
'--exists-action',
'i',
])
except: # pylint: disable=bare-except
# Download the source package
_LOGGER.info(
'No whl was found for the package %s, downloading source' %
requirement)
cmd_args = Stager._download_pypi_package(
package_name=requirement,
temp_dir=cache_dir,
fetch_binary=False,
other_flags=[
'--exists-action',
'i',
])
_LOGGER.info('Executing command: %s', cmd_args)
processes.check_output(cmd_args, stderr=processes.STDOUT)

@staticmethod
def _build_setup_package(setup_file, # type: str
Expand Down Expand Up @@ -824,32 +890,17 @@ def _download_pypi_sdk_package(
raise RuntimeError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove the arguments that can be inferred (language_version, impl tag, abi tag, platform) from the function signature

'Please set --sdk_location command-line option '
'or install a valid {} distribution.'.format(package_name))
cmd_args = [
Stager._get_python_executable(),
'-m',
'pip',
'download',
'--dest',
temp_dir,
'%s==%s' % (package_name, version),
'--no-deps'
]

cmd_args = Stager._download_pypi_package(
temp_dir=temp_dir,
fetch_binary=fetch_binary,
language_version_tag=language_version_tag,
language_implementation_tag=language_implementation_tag,
abi_tag=abi_tag,
platform_tag=platform_tag,
package_name='%s==%s' % (package_name, version),
other_flags=['--no-deps'])
if fetch_binary:
_LOGGER.info('Downloading binary distribution of the SDK from PyPi')
# Get a wheel distribution for the SDK from PyPI.
cmd_args.extend([
'--only-binary',
':all:',
'--python-version',
language_version_tag,
'--implementation',
language_implementation_tag,
'--abi',
abi_tag,
'--platform',
platform_tag
])
# Example wheel: apache_beam-2.4.0-cp27-cp27mu-manylinux1_x86_64.whl
expected_files = [
os.path.join(
Expand All @@ -863,8 +914,6 @@ def _download_pypi_sdk_package(
platform_tag))
]
else:
_LOGGER.info('Downloading source distribution of the SDK from PyPi')
cmd_args.extend(['--no-binary', ':all:'])
expected_files = [
os.path.join(temp_dir, '%s-%s.zip' % (package_name, version)),
os.path.join(temp_dir, '%s-%s.tar.gz' % (package_name, version))
Expand All @@ -884,3 +933,51 @@ def _download_pypi_sdk_package(
'Failed to download a distribution for the running SDK. '
'Expected either one of %s to be found in the download folder.' %
(expected_files))

@staticmethod
def _download_pypi_package(
temp_dir,
fetch_binary=False,
language_version_tag='27',
language_implementation_tag='cp',
abi_tag='cp27mu',
platform_tag='manylinux2014_x86_64',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's create a helper function that returns target platform, and check pip version in that helper. For pip >= 19.3 let's use manylinux2014_x86_64 wheel, otherwise use manylinux2010_x86_64 wheel and add a TODO: when pypa/pip#10760 is addressed, download the wheel based on glibc version in Beam's Python SDK base image.

package_name=None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(optional) It may make sense to move this param earlier in the argument list

other_flags=None):
"""Downloads SDK package from PyPI and returns path to local path."""
if other_flags is None:
other_flags = []
package_name = package_name or Stager.get_sdk_package_name()
cmd_args = [
Stager._get_python_executable(),
'-m',
'pip',
'download',
'--dest',
temp_dir,
package_name,
]

if other_flags:
cmd_args.extend(other_flags) # flags like --no-deps

if fetch_binary:
_LOGGER.info(
'Downloading binary distribution of the %s from PyPi' % package_name)
# Get a wheel distribution for the SDK from PyPI.
cmd_args.extend([
'--only-binary',
':all:',
'--python-version',
language_version_tag,
'--implementation',
language_implementation_tag,
'--abi',
abi_tag,
'--platform',
platform_tag
])
else:
cmd_args.extend(['--no-binary', ':all'])

return cmd_args
47 changes: 44 additions & 3 deletions sdks/python/apache_beam/runners/portability/stager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.runners.internal import names
from apache_beam.runners.portability import stager

Expand Down Expand Up @@ -86,7 +87,8 @@ def file_copy(self, from_path, to_path):
else:
shutil.copyfile(from_path, to_path)

def populate_requirements_cache(self, requirements_file, cache_dir):
def populate_requirements_cache(
self, requirements_file, cache_dir, fetch_binary=False):
_ = requirements_file
self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing')
self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing')
Expand Down Expand Up @@ -690,15 +692,15 @@ def test_remove_dependency_from_requirements(self):
for i in range(len(requirements)):
f.write(requirements[i])

tmp_req_filename = self.stager.remove_dependency_from_requirements(
tmp_req_filename, _ = self.stager._remove_dependency_from_requirements(
requirements_file=os.path.join(requirements_cache_dir, 'abc.txt'),
dependency_to_remove='apache_beam',
temp_directory_path=requirements_cache_dir)
with open(tmp_req_filename, 'r') as tf:
lines = tf.readlines()
self.assertEqual(['avro-python3\n', 'fastavro\n', 'numpy\n'], sorted(lines))

tmp_req_filename = self.stager.remove_dependency_from_requirements(
tmp_req_filename, _ = self.stager._remove_dependency_from_requirements(
requirements_file=os.path.join(requirements_cache_dir, 'abc.txt'),
dependency_to_remove='fastavro',
temp_directory_path=requirements_cache_dir)
Expand All @@ -708,6 +710,45 @@ def test_remove_dependency_from_requirements(self):
self.assertEqual(['apache_beam\n', 'avro-python3\n', 'numpy\n'],
sorted(lines))

def test_download_source_or_whl_using_fetch_binary(self):
staging_dir = self.make_temp_dir()
requirements_cache_dir = self.make_temp_dir()
source_dir = self.make_temp_dir()

options = PipelineOptions()
self.update_options(options)

options.view_as(SetupOptions).requirements_cache = requirements_cache_dir
options.view_as(SetupOptions).requirements_file = os.path.join(
source_dir, stager.REQUIREMENTS_FILE)
self.create_temp_file(
os.path.join(source_dir, stager.REQUIREMENTS_FILE), 'nothing')
# for default container image, the sdk_container_image option would be none
options.view_as(
WorkerOptions).sdk_container_image = None # default value is None

def _create_file(requirements_file, temp_dir, fetch_binary):
if fetch_binary:
self.create_temp_file(os.path.join(temp_dir, 'nothing.whl'), 'Fake whl')
else:
self.create_temp_file(
os.path.join(temp_dir, 'nothing.tar.gz'), 'Fake tarball')

with mock.patch('apache_beam.runners.portability.stager_test'
'.stager.Stager._populate_requirements_cache',
staticmethod(_create_file)):
resources = self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1]
# in the resources, only a whl should be present
for f in resources:
self.assertTrue('.tar.gz' not in f)

options.view_as(WorkerOptions).sdk_container_image = 'fake docker URL'
resources = self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1]

self.assertTrue('nothing.tar.gz' in resources)


class TestStager(stager.Stager):
def stage_artifact(self, local_path_to_artifact, artifact_name, sha256):
Expand Down