-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-4032]Download whl for a known target platform on the container image else download sources #16448
[BEAM-4032]Download whl for a known target platform on the container image else download sources #16448
Changes from all commits
4aa4922
e50d40f
84a7241
a884877
7f5d721
468b1a4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -54,6 +54,7 @@ | |||||
import shutil | ||||||
import sys | ||||||
import tempfile | ||||||
from typing import Callable | ||||||
from typing import List | ||||||
from typing import Optional | ||||||
from typing import Tuple | ||||||
|
@@ -162,7 +163,7 @@ def create_job_resources(options, # type: PipelineOptions | |||||
temp_dir, # type: str | ||||||
build_setup_args=None, # type: Optional[List[str]] | ||||||
pypi_requirements=None, # type: Optional[List[str]] | ||||||
populate_requirements_cache=None, # type: Optional[str] | ||||||
populate_requirements_cache=None, # type: Optional[Callable] | ||||||
skip_prestaged_dependencies=False, # type: Optional[bool] | ||||||
): | ||||||
"""For internal use only; no backwards-compatibility guarantees. | ||||||
|
@@ -197,6 +198,9 @@ def create_job_resources(options, # type: PipelineOptions | |||||
resources = [] # type: List[beam_runner_api_pb2.ArtifactInformation] | ||||||
|
||||||
setup_options = options.view_as(SetupOptions) | ||||||
# True when sdk_container_image is apache beam image | ||||||
fetch_binary = ( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. code might be a little more self-explanatory with:
Suggested change
then, we can use below: |
||||||
setup_options.view_as(WorkerOptions).sdk_container_image is None) | ||||||
|
||||||
# We can skip boot dependencies: apache beam sdk, python packages from | ||||||
# requirements.txt, python packages from extra_packages and workflow tarball | ||||||
|
@@ -221,10 +225,16 @@ def create_job_resources(options, # type: PipelineOptions | |||||
setup_options.requirements_file, REQUIREMENTS_FILE)) | ||||||
# Populate cache with packages from the requirement file option and | ||||||
# stage the files in the cache. | ||||||
if not fetch_binary: # display warning. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the inline comment is not necessary, the comment is self-explanatory in this case. Btw, https://stackoverflow.blog/2021/12/23/best-practices-for-writing-code-comments/ is a good read. Rule 1 applies here. |
||||||
_LOGGER.warning( | ||||||
'Avoid using requirements.txt when using a ' | ||||||
'custom container image.') | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wording suggestion for the message: When using a custom container image, prefer installing additional PyPI dependencies directly into the image, instead of specifying them via runtime options, such as --requirements_file. |
||||||
( | ||||||
populate_requirements_cache if populate_requirements_cache else | ||||||
Stager._populate_requirements_cache)( | ||||||
setup_options.requirements_file, requirements_cache_path) | ||||||
setup_options.requirements_file, | ||||||
requirements_cache_path, | ||||||
fetch_binary=fetch_binary) | ||||||
|
||||||
if pypi_requirements: | ||||||
tf = tempfile.NamedTemporaryFile(mode='w', delete=False) | ||||||
|
@@ -236,7 +246,7 @@ def create_job_resources(options, # type: PipelineOptions | |||||
( | ||||||
populate_requirements_cache if populate_requirements_cache else | ||||||
Stager._populate_requirements_cache)( | ||||||
tf.name, requirements_cache_path) | ||||||
tf.name, requirements_cache_path, fetch_binary=fetch_binary) | ||||||
|
||||||
if setup_options.requirements_file is not None or pypi_requirements: | ||||||
for pkg in glob.glob(os.path.join(requirements_cache_path, '*')): | ||||||
|
@@ -352,7 +362,6 @@ def create_job_resources(options, # type: PipelineOptions | |||||
resources.append( | ||||||
Stager._create_file_stage_to_artifact( | ||||||
dataflow_worker_jar, jar_staged_filename)) | ||||||
|
||||||
return resources | ||||||
|
||||||
def stage_job_resources(self, | ||||||
|
@@ -394,7 +403,7 @@ def create_and_stage_job_resources( | |||||
build_setup_args=None, # type: Optional[List[str]] | ||||||
temp_dir=None, # type: Optional[str] | ||||||
pypi_requirements=None, # type: Optional[List[str]] | ||||||
populate_requirements_cache=None, # type: Optional[str] | ||||||
populate_requirements_cache=None, # type: Optional[Callable] | ||||||
AnandInguva marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
staging_location=None # type: Optional[str] | ||||||
): | ||||||
"""For internal use only; no backwards-compatibility guarantees. | ||||||
|
@@ -640,12 +649,13 @@ def _get_python_executable(): | |||||
return python_bin | ||||||
|
||||||
@staticmethod | ||||||
def remove_dependency_from_requirements( | ||||||
def _remove_dependency_from_requirements( | ||||||
AnandInguva marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
requirements_file, # type: str | ||||||
dependency_to_remove, # type: str | ||||||
temp_directory_path): | ||||||
"""Function to remove dependencies from a given requirements file.""" | ||||||
# read all the dependency names | ||||||
requirements_to_install = [] | ||||||
with open(requirements_file, 'r') as f: | ||||||
lines = f.readlines() | ||||||
|
||||||
|
@@ -656,25 +666,41 @@ def remove_dependency_from_requirements( | |||||
for i in range(len(lines)): | ||||||
if not lines[i].startswith(dependency_to_remove): | ||||||
tf.write(lines[i]) | ||||||
requirements_to_install.append(lines[i].strip()) | ||||||
|
||||||
return tmp_requirements_filename | ||||||
return tmp_requirements_filename, requirements_to_install | ||||||
|
||||||
@staticmethod | ||||||
@retry.with_exponential_backoff( | ||||||
num_retries=4, retry_filter=retry_on_non_zero_exit) | ||||||
def _populate_requirements_cache(requirements_file, cache_dir): | ||||||
# The 'pip download' command will not download again if it finds the | ||||||
# tarball with the proper version already present. | ||||||
# It will get the packages downloaded in the order they are presented in | ||||||
# the requirements file and will download package dependencies. | ||||||
|
||||||
# The apache-beam dependency is excluded from requirements cache population | ||||||
# because we stage the SDK separately. | ||||||
def _populate_requirements_cache(requirements_file, # type: str | ||||||
cache_dir, # type: str | ||||||
fetch_binary=False): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To reduce code complexity, I think we can omit |
||||||
""" | ||||||
The 'pip download' command will not download again if it finds | ||||||
the tarball with the proper version already present. | ||||||
It will get the packages downloaded in the order they are presented in | ||||||
the requirements file and will download package dependencies | ||||||
to the cache_dir. | ||||||
|
||||||
Args: | ||||||
requirements_file: A file with PyPI dependencies. | ||||||
cache_dir: Cache to download deps | ||||||
fetch_binary: Download whls if this flag is enabled. | ||||||
Supported for 'linux_x86_64' architecture. | ||||||
""" | ||||||
|
||||||
# The apache-beam dependency is excluded from requirements cache population | ||||||
# because we stage the SDK separately. | ||||||
with tempfile.TemporaryDirectory() as temp_directory: | ||||||
tmp_requirements_filepath = Stager.remove_dependency_from_requirements( | ||||||
tmp_requirements_filepath, requirements_to_install = ( | ||||||
Stager._remove_dependency_from_requirements( | ||||||
requirements_file=requirements_file, | ||||||
dependency_to_remove='apache-beam', | ||||||
temp_directory_path=temp_directory) | ||||||
) | ||||||
if not fetch_binary: | ||||||
# download only sources. if no source found, this fails. | ||||||
cmd_args = [ | ||||||
Stager._get_python_executable(), | ||||||
'-m', | ||||||
|
@@ -692,6 +718,46 @@ def _populate_requirements_cache(requirements_file, cache_dir): | |||||
] | ||||||
_LOGGER.info('Executing command: %s', cmd_args) | ||||||
processes.check_output(cmd_args, stderr=processes.STDOUT) | ||||||
else: # try to download wheels | ||||||
language_implementation_tag = 'cp' | ||||||
language_version_tag = '%d%d' % ( | ||||||
sys.version_info[0], sys.version_info[1]) # Python version | ||||||
abi_suffix = 'm' if sys.version_info < ( | ||||||
3, 8) else '' # ABI suffix to use for the whl | ||||||
abi_tag = 'cp%d%d%s' % ( | ||||||
sys.version_info[0], sys.version_info[1], abi_suffix | ||||||
) # ABI tag to use | ||||||
# install each package individually | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add explanation why installing each package individually is necessary in this case. |
||||||
for requirement in requirements_to_install: | ||||||
try: | ||||||
# download the bdist compatible with the debian platform | ||||||
# TODO(anandinguva): the platform tag will get updated. Check PEP 600 | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From offline discussion:
|
||||||
cmd_args = Stager._download_pypi_package( | ||||||
package_name=requirement, | ||||||
temp_dir=cache_dir, | ||||||
fetch_binary=True, | ||||||
language_version_tag=language_version_tag, | ||||||
language_implementation_tag=language_implementation_tag, | ||||||
abi_tag=abi_tag, | ||||||
other_flags=[ | ||||||
'--exists-action', | ||||||
'i', | ||||||
]) | ||||||
except: # pylint: disable=bare-except | ||||||
# Download the source package | ||||||
_LOGGER.info( | ||||||
'No whl was found for the package %s, downloading source' % | ||||||
requirement) | ||||||
cmd_args = Stager._download_pypi_package( | ||||||
package_name=requirement, | ||||||
temp_dir=cache_dir, | ||||||
fetch_binary=False, | ||||||
other_flags=[ | ||||||
'--exists-action', | ||||||
'i', | ||||||
]) | ||||||
_LOGGER.info('Executing command: %s', cmd_args) | ||||||
processes.check_output(cmd_args, stderr=processes.STDOUT) | ||||||
|
||||||
@staticmethod | ||||||
def _build_setup_package(setup_file, # type: str | ||||||
|
@@ -824,32 +890,17 @@ def _download_pypi_sdk_package( | |||||
raise RuntimeError( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's remove the arguments that can be inferred (language_version, impl tag, abi tag, platform) from the function signature |
||||||
'Please set --sdk_location command-line option ' | ||||||
'or install a valid {} distribution.'.format(package_name)) | ||||||
cmd_args = [ | ||||||
Stager._get_python_executable(), | ||||||
'-m', | ||||||
'pip', | ||||||
'download', | ||||||
'--dest', | ||||||
temp_dir, | ||||||
'%s==%s' % (package_name, version), | ||||||
'--no-deps' | ||||||
] | ||||||
|
||||||
cmd_args = Stager._download_pypi_package( | ||||||
temp_dir=temp_dir, | ||||||
fetch_binary=fetch_binary, | ||||||
language_version_tag=language_version_tag, | ||||||
language_implementation_tag=language_implementation_tag, | ||||||
abi_tag=abi_tag, | ||||||
platform_tag=platform_tag, | ||||||
package_name='%s==%s' % (package_name, version), | ||||||
other_flags=['--no-deps']) | ||||||
if fetch_binary: | ||||||
_LOGGER.info('Downloading binary distribution of the SDK from PyPi') | ||||||
# Get a wheel distribution for the SDK from PyPI. | ||||||
cmd_args.extend([ | ||||||
'--only-binary', | ||||||
':all:', | ||||||
'--python-version', | ||||||
language_version_tag, | ||||||
'--implementation', | ||||||
language_implementation_tag, | ||||||
'--abi', | ||||||
abi_tag, | ||||||
'--platform', | ||||||
platform_tag | ||||||
]) | ||||||
# Example wheel: apache_beam-2.4.0-cp27-cp27mu-manylinux1_x86_64.whl | ||||||
expected_files = [ | ||||||
os.path.join( | ||||||
|
@@ -863,8 +914,6 @@ def _download_pypi_sdk_package( | |||||
platform_tag)) | ||||||
] | ||||||
else: | ||||||
_LOGGER.info('Downloading source distribution of the SDK from PyPi') | ||||||
cmd_args.extend(['--no-binary', ':all:']) | ||||||
expected_files = [ | ||||||
os.path.join(temp_dir, '%s-%s.zip' % (package_name, version)), | ||||||
os.path.join(temp_dir, '%s-%s.tar.gz' % (package_name, version)) | ||||||
|
@@ -884,3 +933,51 @@ def _download_pypi_sdk_package( | |||||
'Failed to download a distribution for the running SDK. ' | ||||||
'Expected either one of %s to be found in the download folder.' % | ||||||
(expected_files)) | ||||||
|
||||||
@staticmethod | ||||||
def _download_pypi_package( | ||||||
temp_dir, | ||||||
fetch_binary=False, | ||||||
language_version_tag='27', | ||||||
language_implementation_tag='cp', | ||||||
abi_tag='cp27mu', | ||||||
platform_tag='manylinux2014_x86_64', | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's create a helper function that returns target platform, and check pip version in that helper. For pip >= 19.3 let's use manylinux2014_x86_64 wheel, otherwise use manylinux2010_x86_64 wheel and add a TODO: when pypa/pip#10760 is addressed, download the wheel based on glibc version in Beam's Python SDK base image. |
||||||
package_name=None, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (optional) It may make sense to move this param earlier in the argument list |
||||||
other_flags=None): | ||||||
"""Downloads SDK package from PyPI and returns path to local path.""" | ||||||
if other_flags is None: | ||||||
other_flags = [] | ||||||
package_name = package_name or Stager.get_sdk_package_name() | ||||||
cmd_args = [ | ||||||
Stager._get_python_executable(), | ||||||
'-m', | ||||||
'pip', | ||||||
'download', | ||||||
'--dest', | ||||||
temp_dir, | ||||||
package_name, | ||||||
] | ||||||
|
||||||
if other_flags: | ||||||
cmd_args.extend(other_flags) # flags like --no-deps | ||||||
|
||||||
if fetch_binary: | ||||||
_LOGGER.info( | ||||||
'Downloading binary distribution of the %s from PyPi' % package_name) | ||||||
# Get a wheel distribution for the SDK from PyPI. | ||||||
cmd_args.extend([ | ||||||
'--only-binary', | ||||||
':all:', | ||||||
'--python-version', | ||||||
language_version_tag, | ||||||
'--implementation', | ||||||
language_implementation_tag, | ||||||
'--abi', | ||||||
abi_tag, | ||||||
'--platform', | ||||||
platform_tag | ||||||
]) | ||||||
else: | ||||||
cmd_args.extend(['--no-binary', ':all']) | ||||||
|
||||||
return cmd_args |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Let's include argument types. I think it's: